Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve api response times by doing send_activity asynchronously #3493

Merged
merged 11 commits into from
Jul 10, 2023
2 changes: 1 addition & 1 deletion api_tests/run-federation-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e

export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432

export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
pushd ..
cargo build
rm target/lemmy_server || true
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod site;

#[async_trait::async_trait(?Send)]
pub trait Perform {
type Response: serde::ser::Serialize + Send;
type Response: serde::ser::Serialize + Send + Clone + Sync;

async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/api_common/src/custom_emoji.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji {
pub auth: Sensitive<String>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The response for deleting a custom emoji.
Expand Down
2 changes: 1 addition & 1 deletion crates/api_common/src/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ pub struct PurgeComment {
pub auth: Sensitive<String>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The response for purged items.
Expand Down
2 changes: 1 addition & 1 deletion crates/api_crud/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod user;

#[async_trait::async_trait(?Send)]
pub trait PerformCrud {
type Response: serde::ser::Serialize + Send;
type Response: serde::ser::Serialize + Send + Clone + Sync;

async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
}
40 changes: 26 additions & 14 deletions crates/api_crud/src/post/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ use lemmy_db_schema::{
use lemmy_db_views_actor::structs::CommunityView;
use lemmy_utils::{
error::LemmyError,
spawn_try_task,
utils::{
slurs::{check_slurs, check_slurs_opt},
validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
},
SYNCHRONOUS_FEDERATION,
};
use tracing::{warn, Instrument};
use tracing::Instrument;
use url::Url;
use webmention::{Webmention, WebmentionError};

Expand Down Expand Up @@ -143,20 +145,30 @@ impl PerformCrud for CreatePost {
// Mark the post as read
mark_post_as_read(person_id, post_id, context.pool()).await?;

if let Some(url) = &updated_post.url {
let mut webmention =
Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
webmention.set_checked(true);
match webmention
.send()
.instrument(tracing::info_span!("Sending webmention"))
.await
{
Ok(_) => {}
Err(WebmentionError::NoEndpointDiscovered(_)) => {}
Err(e) => warn!("Failed to send webmention: {}", e),
if let Some(url) = updated_post.url.clone() {
let task = async move {
let mut webmention =
Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
webmention.set_checked(true);
match webmention
.send()
.instrument(tracing::info_span!("Sending webmention"))
.await
{
Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
Ok(_) => Ok(()),
Err(e) => Err(LemmyError::from_error_message(
e,
"Couldn't send webmention",
)),
}
};
if *SYNCHRONOUS_FEDERATION {
task.await?;
} else {
spawn_try_task(task);
}
}
};

build_post_response(context, community_id, person_id, post_id).await
}
Expand Down
2 changes: 1 addition & 1 deletion crates/apub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ where

#[async_trait::async_trait]
pub trait SendActivity: Sync {
type Response: Sync + Send;
type Response: Sync + Send + Clone;

async fn send_activity(
_request: &Self,
Expand Down
28 changes: 28 additions & 0 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ pub mod request;
pub mod utils;
pub mod version;

use error::LemmyError;
use futures::Future;
use once_cell::sync::Lazy;
use std::time::Duration;
use tracing::Instrument;

pub type ConnectionId = usize;

Expand All @@ -31,3 +35,27 @@ macro_rules! location_info {
)
};
}

/// if true, all federation should happen synchronously. useful for debugging and testing.
/// defaults to true on debug mode, false on releasemode
/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(cfg!(debug_assertions))
});

/// tokio::spawn, but accepts a future that may fail and also
/// * logs errors
/// * attaches the spawned task to the tracing span of the caller for better logging
pub fn spawn_try_task(task: impl Future<Output = Result<(), LemmyError>> + Send + 'static) {
tokio::spawn(
async {
if let Err(e) = task.await {
tracing::warn!("error in spawn: {e}");
}
}
.in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called
);
}
22 changes: 17 additions & 5 deletions src/api_routes_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use lemmy_apub::{
},
SendActivity,
};
use lemmy_utils::rate_limit::RateLimitCell;
use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION};
use serde::Deserialize;

pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
Expand Down Expand Up @@ -382,8 +382,14 @@ where
+ 'static,
{
let res = data.perform(&context).await?;
SendActivity::send_activity(&data, &res, &apub_data).await?;
Ok(HttpResponse::Ok().json(res))
let res_clone = res.clone();
let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
if *SYNCHRONOUS_FEDERATION {
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(HttpResponse::Ok().json(&res))
}

async fn route_get<'a, Data>(
Expand Down Expand Up @@ -432,8 +438,14 @@ where
+ 'static,
{
let res = data.perform(&context).await?;
SendActivity::send_activity(&data, &res, &apub_data).await?;
Ok(HttpResponse::Ok().json(res))
let res_clone = res.clone();
let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
if *SYNCHRONOUS_FEDERATION {
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(HttpResponse::Ok().json(&res))
}

async fn route_get_crud<'a, Data>(
Expand Down
9 changes: 7 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use lemmy_db_schema::{
utils::{build_db_pool, get_database_url, run_migrations},
};
use lemmy_routes::{feeds, images, nodeinfo, webfinger};
use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS};
use lemmy_utils::{
error::LemmyError,
rate_limit::RateLimitCell,
settings::SETTINGS,
SYNCHRONOUS_FEDERATION,
};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
Expand Down Expand Up @@ -139,7 +144,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.worker_count(settings.worker_count)
.retry_count(settings.retry_count)
.debug(cfg!(debug_assertions))
.debug(*SYNCHRONOUS_FEDERATION)
.http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
.build()
Expand Down
2 changes: 1 addition & 1 deletion src/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics {
.register(Box::new(metrics.available.clone()))
.unwrap();

return metrics;
metrics
}

async fn collect_db_pool_metrics(context: &PromContext) {
Expand Down