Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove actix-rt and replace with tokio tasks #42

Merged
merged 8 commits into from
Jun 20, 2023

Conversation

cetra3
Copy link
Contributor

@cetra3 cetra3 commented Jun 14, 2023

This removes the background-jobs crate and replaces it with an inline activity queue that spawns tokio tasks to handle the background queue.

I haven't given this a good test yet, as it doesn't appear very easy to benchmark/stress test the activity queue out, so would be interested in thoughts there.

This could be built into something with a bit more persistence in the future & generalise on the task submission.

Resolves #38 & #32

@cetra3 cetra3 force-pushed the remove_actix_rt branch 3 times, most recently from 4ad5b2a to 354b56d Compare June 14, 2023 05:31
@cetra3 cetra3 force-pushed the remove_actix_rt branch from 354b56d to 86bce7c Compare June 14, 2023 06:32
impl ActivityQueue {
fn new(client: ClientWithMiddleware, timeout: Duration, worker_count: usize) -> Self {
// Keep a vec of senders to send our messages to
let mut senders = Vec::with_capacity(worker_count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually something I was wondering about recently. At the moment we have lots of new users on Lemmy, and instance admins need to increase this worker count manually to keep federation synchronized. However the workers seem very lightweight, lemmy.ml is running fine with 200k running at once.

So maybe you can tell me if this fixed worker count is really necessary, or if we can remove it entirely and spawn workers on demand. This would also require that unused workers get cleaned up automatically when the queue empties.

Related to this, Im wondering how tokio workers compare to previous actix workers, would there be any difference in cpu or memory usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So regarding the original implementation I don't know how concurrent it would be, considering that there isn't really any work stealing going on between threads.

I have another PR for lemmy to use tokio instead of actix rt to bring up the server, as I feel (& definitely need to test this with benchmarking!) that we should be using tokio's work stealing instead of actix rt. However it panicked when I got rid of actix rt because background-jobs uses it, hence this PR.

The reason why I think this is the case is that actix rt brings up multiple single threaded executors, but they work independently and do not share tasks with other threads:

To achieve similar performance to multi-threaded, work-stealing runtimes, applications using actix-rt will create multiple, mostly disconnected, single-threaded runtimes. This approach has good performance characteristics for workloads where the majority of tasks have similar runtime expense.

This is great at pinning tcp sockets to specific threads and saving on context switching, but the brunt of the work lemmy is doing is sharing DB connections and this background activity stuff, highly variable in terms of workload. This is why I think it's probably much more suited to a work stealing queue, especially if we are sleeping/retrying a lot of the time.

I will preface this that I think I'll need to work on making a bit of a benchmark to prove this theory out as it's hard to know without some reference whether this will make things faster. It might make things slower!

I am not sure how concurrent the existing implementation is, given that it's running on individual single threaded executors & while you might submit 200k tasks there are only a fixed number of workers running them concurrently. For actix rt with default I believe it's one single-threaded runtime per thread.

In terms of workers on demand: yes we can possibly look at scaling up/down dynamically, but I'm thinking a good first step is to have a number of workers as a multiplier as number of threads available. Like 128 * available_parallelism(). Once again, benchmarks appear to be necessary here. We might also need to make the reqwest connection pool match this amount.

The tokio documentation states that there is about 64 bytes of memory per task. I believe that these task are going to live on the stack now which might mean we need to box them if there are going to be millions.

So the related question is solved by needing to benchmark both approaches

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am completely ignorant when it comes to different async runtimes, it sounds like you understand it much better. as a reference, the highest I saw was around 200.000 workers on lemmy.ml, running on an 8cpu vps, with the lemmy backend taking roughly one cpu core. Postgres took everything else, and ram was not a problem. So optimizing this is not necessary yet, but cant hurt to make it more effective. For benchmarking we could deploy something to lemmy.ml, as long as it wont crash.

@asonix Maybe you also have some thoughts on this.

Copy link
Contributor

@phiresky phiresky Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe you can tell me if this fixed worker count is really necessary, or if we can remove it entirely and spawn workers on demand

In terms of workers on demand: yes we can possibly look at scaling up/down dynamically, but I'm thinking a good first step is to have a number of workers as a multiplier as number of threads available

I think @Nutomic is right here..

@cetra3 's code uses tokio::spawn to create a set of 64 (worker_count) tokio tasks and then uses round-robin to distribute the incoming tasks to those 64 workers. This looks very much like you're building a load-leveling system to distribute the load.

But I don't think this approach makes a lot of sense on top of tokio: Tokio already has an internal pool of worker threads that it distributes the load to. That queue has a fixed number of worker threads (by default = cpu_count).

So now you basically have a queue of 64 "fake" workers (green threads) that are scheduled on-demand on 8 real tokio worker threads, that each have their own unbounded queue of tasks to run. It seems like you're adding a second level of indirection that I don't think gives you any advantages. Having more "fake" workers on top of the cpu_count real workers shouldn't improve performance any either.

Instead I think it would make more sense to remove the round-robin and just rely on the internal tokio work queue to make the scheduling decisions. The work stealing happens on the layer of the real tokio threads, so by adding that second layer of green threads you're actually preventing work stealing between each of the 64 green threads. The reason is that you're making fixed assignments of each thing to one of the 64 queues instead of letting tokio decide where to put it.

I'd use a single mpsc queue with a single loop that reads from the mpsc queue and directly tokio::spawns a task for each thing. That way this code becomes a fair bit simpler and (imo) should also be faster. Just like the example in the main tokio docs docs.

Copy link
Contributor

@phiresky phiresky Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of tasks should be limited I think something like this should be done instead: tokio-rs/tokio#2648 (comment)

That way the rest of the tokio runtime is not affected if there's too many pending tasks. But then the mpsc queue should also be bounded and work has to be explicitly dropped based on some criteria (e.g. prioritize posts over comments, comments over votes). Otherwise your queue(s) will just keep getting longer and longer.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there! Just commenting since I was tagged: I think this is a fine change to make. Especially because lemmy hasn't had persisted storage for background tasks. I might recommend taking a look at https://github.com/rafaelcaricio/backie if you want to grow from this inline queue to a more structured and persisted queue in the future.

I'll also say that using 200K workers with background-jobs probably wasn't a great idea, but if it worked for you then it worked lol.

src/activity_queue.rs Outdated Show resolved Hide resolved
@Nutomic
Copy link
Member

Nutomic commented Jun 14, 2023

For testing I would try to write a unit test which launches a few thousand jobs, each of which sleeps a while and then increments an atomic int. Then check that the counter has reached the expected value. Plus another test where jobs keep some internal state in order to fail on first run to ensure that retry works.

@cetra3
Copy link
Contributor Author

cetra3 commented Jun 15, 2023

OK, I've wired up a pretty janky test to benchmark what's happening.

I get about 3000msg/s which seems to be consistent regardless of how many workers/messages are in the queue. I've wired this test up to a flame graph I generated with:

cargo flamegraph --unit-test -- test_activity_queue_workers

flamegraph

The flamegraph shows that the overhead from tokio is insignificant. Keeping in mind it's all requests to localhost so no real network traversal. The majority of the time it's spending doing two things:

  • Reading a PEM string and creating a Pkey<Private>
  • Signing messages

I had a look through the activitypub spec & the source for mastodon to find out whether we could get away with reusing signed requests. It looks as though there is a bit of slop either side but the window is ~1 hour (if you account for CLOCK_SKEW_MARGIN). If we had tight retry times then we might get away with it, but I think the intent is that the request should be re-signed if it is tried at a later date.

Reading and parsing a PEM file, however, is probably an easy performance win. There are a couple of paths:

  • Have some sort of caching take place for strings to PEM
  • Adjust the traits to store a PKey<Private> on them rather than a String

I think I can adjust the scope of this PR and try either or.

I think I like the latter option: adjust traits and keep PKey<Private> around in it's decoded form.

@Nutomic
Copy link
Member

Nutomic commented Jun 15, 2023

HTTP signatures expire after 10 seconds which is the default in the http-signature-normalization library. This can be changed on the sending side here, and in fact I dont see any problem with using a higher value. It might even make delivery more reliable. Here is the HTTP signature standard by the way. At least for the first retry (after 60s) the signature could be reused if its not too complicated to implement.

I always prefer stricter typing, so using PKey<Private> instead of string sounds good. This can be done in send_activity() before the loop, so it only needs to be once when sending an activity to many inboxes (a common case), and it doesnt require any API change. Though changing ActorType::private_key_pem() to return PLey<Private> instead of string would arguably be cleaner.

By the way #43 should also be helpful.

@cetra3
Copy link
Contributor Author

cetra3 commented Jun 16, 2023

OK I've adjusted the expiry to be ~5 mins & refactored some of the internals to only parse the PEM data once. This has given a boost to ~20,000 msg/s on my PC which I'm a little happier with. I have kept the actor trait as it is. It can be refactored out later if it's still proving to be a bottleneck.

I've adjusted the test server to fail every now and then to test the retry functionality.

I've also adjusted the activity payload from a string to Bytes so that it's more cheaply cloneable since the payload is actually shared between inboxes etc.. I have also condensed the JSON by not using the pretty printing methods. This should save a few bytes.

In terms of #43, I don't know how 100% correct it could be if you sent an activity to a webserver that doesn't support compression. Normally http compression works by someone making a request with appropriate accept headers, then the return of the request will compress or not based upon what the requestee accepts. In this scenario, we are making a post, so we're the requestee & without making an initial request first to work out what the server accepts, we might find that they don't support compression.

I am cautiously optimistic that the changes I've made should help with performance. However it would need to be tested in more of a "real world" scenario, as the benchmarking I'm doing is rather synthetic.

@phiresky
Copy link
Contributor

@cetra3 could you share what code / command you use for benchmarking now?

@phiresky
Copy link
Contributor

Here's my suggestion (as a git patch). Performance seems mostly unaffected by these changes based on your test_activity_queue_workers benchmark (without dodgy).

diff --git a/Cargo.toml b/Cargo.toml
index 37471ab..c0c77ff 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -86,3 +86,6 @@ path = "examples/local_federation/main.rs"
 [[example]]
 name = "live_federation"
 path = "examples/live_federation/main.rs"
+
+[profile.release]
+lto = "fat" # doesn't seem to make much of a difference (maybe 10%) but still a good idea?
\ No newline at end of file
diff --git a/src/activity_queue.rs b/src/activity_queue.rs
index 817cbf2..e23c899 100644
--- a/src/activity_queue.rs
+++ b/src/activity_queue.rs
@@ -30,8 +30,8 @@ use std::{
     time::{Duration, SystemTime},
 };
 use tokio::{
-    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
-    task::JoinHandle,
+    sync::mpsc::{unbounded_channel, UnboundedSender},
+    task::{JoinHandle, JoinSet},
 };
 use tracing::{debug, info, warn};
 use url::Url;
@@ -219,11 +219,9 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
 /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
 pub(crate) struct ActivityQueue {
     // Our "background" tasks
-    senders: Vec<UnboundedSender<SendActivityTask>>,
-    handles: Vec<JoinHandle<()>>,
+    sender: UnboundedSender<SendActivityTask>,
+    recv_handle: JoinHandle<()>,
     reset_handle: JoinHandle<()>,
-    // Round robin of the sender list
-    last_sender_idx: AtomicUsize,
     // Stats shared between the queue and workers
     stats: Arc<Stats>,
 }
@@ -251,26 +249,24 @@ struct RetryStrategy {
 async fn worker(
     client: ClientWithMiddleware,
     timeout: Duration,
-    mut receiver: UnboundedReceiver<SendActivityTask>,
+    message: SendActivityTask,
     stats: Arc<Stats>,
     strategy: RetryStrategy,
 ) {
-    while let Some(message) = receiver.recv().await {
-        stats.pending.fetch_sub(1, Ordering::Relaxed);
-        stats.running.fetch_add(1, Ordering::Relaxed);
+    stats.pending.fetch_sub(1, Ordering::Relaxed);
+    stats.running.fetch_add(1, Ordering::Relaxed);
 
-        let outcome = retry(|| sign_and_send(&message, &client, timeout), strategy).await;
+    let outcome = retry(|| sign_and_send(&message, &client, timeout), strategy).await;
 
-        // "Running" has finished, check the outcome
-        stats.running.fetch_sub(1, Ordering::Relaxed);
+    // "Running" has finished, check the outcome
+    stats.running.fetch_sub(1, Ordering::Relaxed);
 
-        match outcome {
-            Ok(_) => {
-                stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
-            }
-            Err(_err) => {
-                stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
-            }
+    match outcome {
+        Ok(_) => {
+            stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
+        }
+        Err(_err) => {
+            stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
         }
     }
 }
@@ -282,10 +278,6 @@ impl ActivityQueue {
         timeout: Duration,
         strategy: RetryStrategy,
     ) -> Self {
-        // Keep a vec of senders to send our messages to
-        let mut senders = Vec::with_capacity(worker_count);
-        let mut handles = Vec::with_capacity(worker_count);
-
         let stats: Arc<Stats> = Default::default();
 
         // This task clears the dead/completed stats every hour
@@ -299,36 +291,42 @@ impl ActivityQueue {
             }
         });
 
-        // Spawn our workers
-        for _ in 0..worker_count {
-            let (sender, receiver) = unbounded_channel();
-            handles.push(tokio::spawn(worker(
-                client.clone(),
-                timeout,
-                receiver,
-                stats.clone(),
-                strategy,
-            )));
-            senders.push(sender);
-        }
+        let (sender, mut receiver) = unbounded_channel();
+        let stats2 = stats.clone();
+        let recv_handle = tokio::spawn(async move {
+            let mut join_set = JoinSet::new();
+            while let Some(task) = receiver.recv().await {
+                join_set.spawn(worker(
+                    client.clone(),
+                    timeout,
+                    task,
+                    stats2.clone(),
+                    strategy,
+                ));
+                while join_set.len() > worker_count {
+                    // prevent there being more than worker_count running tasks
+                    join_set.join_next().await;
+                }
+            }
+            // make sure all tasks are done during shut down
+            while !join_set.is_empty() {
+                join_set.join_next().await;
+            }
+        });
 
         Self {
-            senders,
-            handles,
+            sender,
+            recv_handle,
             reset_handle,
-            last_sender_idx: AtomicUsize::new(0),
             stats,
         }
     }
     async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
-        // really basic round-robin to our workers, we just do mod on the len of senders
-        let idx_to_send = self.last_sender_idx.fetch_add(1, Ordering::Relaxed) % self.senders.len();
-
         // Set a queue to pending
         self.stats.pending.fetch_add(1, Ordering::Relaxed);
 
         // Send to one of our workers
-        self.senders[idx_to_send].send(message)?;
+        self.sender.send(message)?;
 
         Ok(())
     }
@@ -340,15 +338,13 @@ impl ActivityQueue {
     #[allow(unused)]
     // Drops all the senders and shuts down the workers
     async fn shutdown(self) -> Result<Stats, anyhow::Error> {
-        drop(self.senders);
+        drop(self.sender);
 
         // stop the reset counter task
         self.reset_handle.abort();
         self.reset_handle.await.ok();
 
-        for handle in self.handles {
-            handle.await?;
-        }
+        self.recv_handle.await?;
 
         Arc::try_unwrap(self.stats).map_err(|_| anyhow!("Could not retrieve stats"))
     }
@@ -438,32 +434,54 @@ mod tests {
         }
         Ok(())
     }
+    // This will periodically send back internal errors to test the retry
+    async fn reliable_handler(
+        State(state): State<Arc<AtomicUsize>>,
+        headers: HeaderMap,
+        body: Bytes,
+    ) -> Result<(), StatusCode> {
+        debug!("Headers:{:?}", headers);
+        debug!("Body len:{}", body.len());
+
+        if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
+            return Err(StatusCode::INTERNAL_SERVER_ERROR);
+        }
+        Ok(())
+    }
 
-    async fn test_server() {
+    async fn test_server(dodgy: bool) {
         use axum::{routing::post, Router};
 
         // We should break every now and then ;)
         let state = Arc::new(AtomicUsize::new(0));
-
-        let app = Router::new()
-            .route("/", post(dodgy_handler))
-            .with_state(state);
-
+        let app = if dodgy {
+            Router::new()
+                .route("/", post(dodgy_handler))
+                .with_state(state)
+        } else {
+            Router::new()
+                .route("/", post(reliable_handler))
+                .with_state(state)
+        };
         axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
             .serve(app.into_make_service())
             .await
             .unwrap();
     }
 
+    // #[ignore]
+    #[tokio::test(flavor = "multi_thread")]
+    async fn bench_activity_queue_workers() {
+        activity_queue_workers(64, 100_000, false).await
+    }
     #[tokio::test(flavor = "multi_thread")]
-    // Queues 10_000 messages and then asserts that the worker runs them
     async fn test_activity_queue_workers() {
-        let num_workers = 64;
-        let num_messages: usize = 100;
-
-        tokio::spawn(test_server());
+        activity_queue_workers(64, 100, true).await
+    }
+    // Queues 10_000 messages and then asserts that the worker runs them
+    async fn activity_queue_workers(num_workers: usize, num_messages: usize, dodgy: bool) {
+        tokio::spawn(test_server(dodgy));
 
-        /*
         // uncomment for debug logs & stats
         use tracing::log::LevelFilter;
 
@@ -472,7 +490,6 @@ mod tests {
             .filter_module("activitypub_federation", LevelFilter::Info)
             .format_timestamp(None)
             .init();
-        */
 
         let activity_queue = ActivityQueue::new(
             reqwest::Client::default().into(),

@cetra3
Copy link
Contributor Author

cetra3 commented Jun 17, 2023

@phiresky Thanks for your diff! but I couldn't apply it to my branch for some reason. I have made adjustments in the spirit of changes you introduced.

I am doing the following to benchmark:

  • bumping up num_messages to 100000
  • uncommenting the debug logs
  • not making it a "dodgy" server by commenting out the intermittent 500s

I've adjusted it to use tokio::spawn directly rather than have the worker tasks hang around & don't really see a dip in performance. I did try that yesterday but was seeing a reduction in throughput. It looks like if there are too many workers there is some bookkeeping getting in the way of performance, but if you set the num_workers to be a low but sensible number, it seems to work OK.

Having said that: if we have tasks that are going to hang around for 60 hours it's going to need to be fairly high since there could be a lot of outstanding requests retrying.

For that reason, I've adjusted it to:

  • Retry a few times with the same signature 60 seconds apart (saves us recomputing the signature)
  • Spawn another task which is essentially a lower priority retry queue that can sleep & take things a lot slower. This means that servers that are acting well will receive results pretty quickly, and bad servers have lower priority.

@cetra3 cetra3 force-pushed the remove_actix_rt branch 2 times, most recently from abb431a to 5f16ed4 Compare June 17, 2023 02:12
@cetra3 cetra3 force-pushed the remove_actix_rt branch from 5f16ed4 to 5154060 Compare June 17, 2023 02:30
@phiresky
Copy link
Contributor

Great!

Having a separate queue for retries definitely makes sense, though long term they should probably be stored in the DB or a file instead after the first or second failure (or while shutting down).

I think your use of the semaphore is a bit wrong, I'll add an inline comment about that.

@phiresky
Copy link
Contributor

I just realized there's another issue with this whole thing: It replaces the main runtime with the normal tokio runtime, but I think when this library is instantiated from lemmy it has to run in lemmy's runtime which is also actix_rt and can't be switched over because other lemmy code also uses actors?

@cetra3
Copy link
Contributor Author

cetra3 commented Jun 18, 2023

@phiresky You can run this within actix_rt as it uses tokio under the hood. I.e, it will be compatible. However I do have a PR opened that does use tokio::main as I feel it will give better performance, especially with db pools and other things. The only actors lemmy was using was in the websocket stuff that is pulled out of the latest release. I've ran the PR locally without issue, but obviously more testing is required.

Good point on the join_set. The permit was passed down to the child task so the semaphore was working fine, but the join_set would continue to grow larger. I've adjusted the join_set to apply backpressure as per your example, but before the task is spawned.

I do agree there should be some persistence added to the outgoing activity queue at some point, especially for retries.

@Nutomic Nutomic merged commit c356265 into LemmyNet:main Jun 20, 2023
@Nutomic
Copy link
Member

Nutomic commented Jun 20, 2023

Looks good, thanks! However you didnt remove the worker_count parameter after all, whats the reason for keeping it?

@phiresky
Copy link
Contributor

phiresky commented Jun 20, 2023

The worker_count parameter has a different meaning after this PR but it is still needed for two reasons

  1. JoinSet is used. JoinSet has (for some reason) no method to clear all already done items so without a limit it would grow forever. Maybe this could also be fixed by just polling join_next() until it's not ready anymore.
    • joinset is needed to ensure tasks are finished on shutdown. Just found an alternative method for that here
  2. In order to put backpressure somewhere. Right now it's kinda useless because the mpsc queue is unbounded anyways. Longer term, the queue should also be bounded I think and excess items logged and dropped (if it can't send as fast as it's getting new items than letting it queue up more and more doesn't improve the situation)

In summary, I think this should be better than it was before, but I'm pretty sure it's not the optimal solution and I guess none of know how that would look right now ;)

@phiresky
Copy link
Contributor

phiresky commented Jul 6, 2023

@cetra3 i wanted to mention that i think it should be possible after all to clear out finished tasks even if the number should not be bounded by directly using poll_join_next until it returns Poll::Pending

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Remove actix-rt depenency
4 participants