From a83b8d8f1e2949633aea1537bd44d82630a61773 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 5 Jul 2023 16:01:13 +0000 Subject: [PATCH 1/8] do send_activity after http response --- crates/api/src/lib.rs | 2 +- crates/api_common/src/custom_emoji.rs | 2 +- crates/api_common/src/site.rs | 2 +- crates/api_crud/src/lib.rs | 2 +- crates/apub/src/lib.rs | 2 +- src/api_routes_http.rs | 18 ++++++++++++++---- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 7ac3cec726..e4e496ba99 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -17,7 +17,7 @@ mod site; #[async_trait::async_trait(?Send)] pub trait Perform { - type Response: serde::ser::Serialize + Send; + type Response: serde::ser::Serialize + Send + Clone + Sync; async fn perform(&self, context: &Data) -> Result; } diff --git a/crates/api_common/src/custom_emoji.rs b/crates/api_common/src/custom_emoji.rs index 550dd7a3fc..7f3461ca79 100644 --- a/crates/api_common/src/custom_emoji.rs +++ b/crates/api_common/src/custom_emoji.rs @@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji { pub auth: Sensitive, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", ts(export))] /// The response for deleting a custom emoji. diff --git a/crates/api_common/src/site.rs b/crates/api_common/src/site.rs index 865acc0dc2..bc7687e3ce 100644 --- a/crates/api_common/src/site.rs +++ b/crates/api_common/src/site.rs @@ -395,7 +395,7 @@ pub struct PurgeComment { pub auth: Sensitive, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", ts(export))] /// The response for purged items. diff --git a/crates/api_crud/src/lib.rs b/crates/api_crud/src/lib.rs index a10309fc94..b9449ca69d 100644 --- a/crates/api_crud/src/lib.rs +++ b/crates/api_crud/src/lib.rs @@ -12,7 +12,7 @@ mod user; #[async_trait::async_trait(?Send)] pub trait PerformCrud { - type Response: serde::ser::Serialize + Send; + type Response: serde::ser::Serialize + Send + Clone + Sync; async fn perform(&self, context: &Data) -> Result; } diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index a5bc41d1fd..38dd4cba5e 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -181,7 +181,7 @@ where #[async_trait::async_trait] pub trait SendActivity: Sync { - type Response: Sync + Send; + type Response: Sync + Send + Clone; async fn send_activity( _request: &Self, diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index ca0fa4c225..085d6b3be7 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -382,8 +382,13 @@ where + 'static, { let res = data.perform(&context).await?; - SendActivity::send_activity(&data, &res, &apub_data).await?; - Ok(HttpResponse::Ok().json(res)) + let res_clone = res.clone(); + tokio::spawn(async move { + if let Err(e) = SendActivity::send_activity(&data, &res_clone, &apub_data).await { + tracing::warn!("could not send_activity: {e}"); + } + }); + Ok(HttpResponse::Ok().json(&res)) } async fn route_get<'a, Data>( @@ -432,8 +437,13 @@ where + 'static, { let res = data.perform(&context).await?; - SendActivity::send_activity(&data, &res, &apub_data).await?; - Ok(HttpResponse::Ok().json(res)) + let res_clone = res.clone(); + tokio::spawn(async move { + if let Err(e) = SendActivity::send_activity(&data, &res_clone, &apub_data).await { + tracing::warn!("could not send_activity crud: {e}"); + } + }); + Ok(HttpResponse::Ok().json(&res)) } async fn route_get_crud<'a, Data>( From 8791b61eb02b260a6e4807f0254b33b78d01c726 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 11:10:53 +0000 Subject: [PATCH 2/8] move to util function --- crates/api_crud/src/post/create.rs | 29 ++++++++++++++++------------- crates/utils/build.rs | 2 +- crates/utils/src/lib.rs | 17 +++++++++++++++++ src/api_routes_http.rs | 14 +++----------- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 1dcc902411..a0162685e0 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -29,6 +29,7 @@ use lemmy_db_schema::{ use lemmy_db_views_actor::structs::CommunityView; use lemmy_utils::{ error::LemmyError, + spawn_try_task, utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{clean_url_params, is_valid_body_field, is_valid_post_title}, @@ -142,19 +143,21 @@ impl PerformCrud for CreatePost { // Mark the post as read mark_post_as_read(person_id, post_id, context.pool()).await?; - if let Some(url) = &updated_post.url { - let mut webmention = - Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; - webmention.set_checked(true); - match webmention - .send() - .instrument(tracing::info_span!("Sending webmention")) - .await - { - Ok(_) => {} - Err(WebmentionError::NoEndpointDiscovered(_)) => {} - Err(e) => warn!("Failed to send webmention: {}", e), - } + if let Some(url) = updated_post.url.clone() { + spawn_try_task(async move { + let mut webmention = + Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; + webmention.set_checked(true); + match webmention + .send() + .instrument(tracing::info_span!("Sending webmention")) + .await + { + Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()), + Ok(_) => Ok(()), + Err(e) => Err(LemmyError::from_error_message(e, "Couldn't send webmention")), + } + }); } build_post_response(context, community_id, person_id, post_id).await diff --git a/crates/utils/build.rs b/crates/utils/build.rs index b336f8c560..95e6fe5d76 100644 --- a/crates/utils/build.rs +++ b/crates/utils/build.rs @@ -8,4 +8,4 @@ fn main() -> Result<(), Box> { .generate()?; Ok(()) -} +} \ No newline at end of file diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index e5d07db2c6..3f23cadc49 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -14,7 +14,10 @@ pub mod request; pub mod utils; pub mod version; +use error::LemmyError; +use futures::Future; use std::time::Duration; +use tracing::Instrument; pub type ConnectionId = usize; @@ -31,3 +34,17 @@ macro_rules! location_info { ) }; } + +/// tokio::spawn, but accepts a future that may fail and also +/// * logs errors +/// * attaches the spawned task to the tracing span of the caller for better logging +pub fn spawn_try_task(task: impl Future> + Send + 'static) { + tokio::spawn( + async { + if let Err(e) = task.await { + tracing::warn!("error in spawn: {e}"); + } + } + .in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called + ); +} diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index 085d6b3be7..2a10aa7321 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -105,7 +105,7 @@ use lemmy_apub::{ }, SendActivity, }; -use lemmy_utils::rate_limit::RateLimitCell; +use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task}; use serde::Deserialize; pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { @@ -383,11 +383,7 @@ where { let res = data.perform(&context).await?; let res_clone = res.clone(); - tokio::spawn(async move { - if let Err(e) = SendActivity::send_activity(&data, &res_clone, &apub_data).await { - tracing::warn!("could not send_activity: {e}"); - } - }); + spawn_try_task(async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }); Ok(HttpResponse::Ok().json(&res)) } @@ -438,11 +434,7 @@ where { let res = data.perform(&context).await?; let res_clone = res.clone(); - tokio::spawn(async move { - if let Err(e) = SendActivity::send_activity(&data, &res_clone, &apub_data).await { - tracing::warn!("could not send_activity crud: {e}"); - } - }); + spawn_try_task(async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }); Ok(HttpResponse::Ok().json(&res)) } From 4be3872f62a09ee91d8d5b8ce49d48e5797a5c22 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 11:11:31 +0000 Subject: [PATCH 3/8] format --- crates/api_crud/src/post/create.rs | 7 +++++-- crates/utils/build.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index a0162685e0..9a994cc67e 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -35,7 +35,7 @@ use lemmy_utils::{ validation::{clean_url_params, is_valid_body_field, is_valid_post_title}, }, }; -use tracing::{warn, Instrument}; +use tracing::Instrument; use url::Url; use webmention::{Webmention, WebmentionError}; @@ -155,7 +155,10 @@ impl PerformCrud for CreatePost { { Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()), Ok(_) => Ok(()), - Err(e) => Err(LemmyError::from_error_message(e, "Couldn't send webmention")), + Err(e) => Err(LemmyError::from_error_message( + e, + "Couldn't send webmention", + )), } }); } diff --git a/crates/utils/build.rs b/crates/utils/build.rs index 95e6fe5d76..b336f8c560 100644 --- a/crates/utils/build.rs +++ b/crates/utils/build.rs @@ -8,4 +8,4 @@ fn main() -> Result<(), Box> { .generate()?; Ok(()) -} \ No newline at end of file +} From fdf55dab3e618cdbe219f211141f1970edd6aa2b Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 11:30:07 +0000 Subject: [PATCH 4/8] fix prometheus --- src/prometheus_metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prometheus_metrics.rs b/src/prometheus_metrics.rs index 1ff47a54ba..4fe8150f2b 100644 --- a/src/prometheus_metrics.rs +++ b/src/prometheus_metrics.rs @@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics { .register(Box::new(metrics.available.clone())) .unwrap(); - return metrics; + metrics } async fn collect_db_pool_metrics(context: &PromContext) { From b1d967742ee0b1f6fa18f1682eb0b34beddec1e5 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 14:33:08 +0000 Subject: [PATCH 5/8] make synchronous federation configurable --- api_tests/run-federation-test.sh | 2 +- crates/api_crud/src/post/create.rs | 12 +++++++++--- crates/utils/src/lib.rs | 11 +++++++++++ src/api_routes_http.rs | 16 +++++++++++++--- src/lib.rs | 4 ++-- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ed4bba4167..abced2ad6c 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 - +export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 9a994cc67e..4691ac4e86 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -29,6 +29,7 @@ use lemmy_db_schema::{ use lemmy_db_views_actor::structs::CommunityView; use lemmy_utils::{ error::LemmyError, + SYNCHRONOUS_FEDERATION, spawn_try_task, utils::{ slurs::{check_slurs, check_slurs_opt}, @@ -144,7 +145,7 @@ impl PerformCrud for CreatePost { mark_post_as_read(person_id, post_id, context.pool()).await?; if let Some(url) = updated_post.url.clone() { - spawn_try_task(async move { + let task = async move { let mut webmention = Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; webmention.set_checked(true); @@ -160,8 +161,13 @@ impl PerformCrud for CreatePost { "Couldn't send webmention", )), } - }); - } + }; + if *SYNCHRONOUS_FEDERATION { + task.await?; + } else { + spawn_try_task(task); + } + }; build_post_response(context, community_id, person_id, post_id).await } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 3f23cadc49..2c71c58d03 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -16,6 +16,7 @@ pub mod version; use error::LemmyError; use futures::Future; +use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -35,6 +36,16 @@ macro_rules! location_info { }; } +/// if true, all federation should happen synchronously. useful for debugging and testing. +/// defaults to true on debug mode, false on releasemode +/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 +/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" +pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(cfg!(debug_assertions)) +}); + /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index 2a10aa7321..32247b48a4 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -105,7 +105,7 @@ use lemmy_apub::{ }, SendActivity, }; -use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task}; +use lemmy_utils::{rate_limit::RateLimitCell, SYNCHRONOUS_FEDERATION, spawn_try_task}; use serde::Deserialize; pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { @@ -383,7 +383,12 @@ where { let res = data.perform(&context).await?; let res_clone = res.clone(); - spawn_try_task(async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }); + let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } Ok(HttpResponse::Ok().json(&res)) } @@ -434,7 +439,12 @@ where { let res = data.perform(&context).await?; let res_clone = res.clone(); - spawn_try_task(async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }); + let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } Ok(HttpResponse::Ok().json(&res)) } diff --git a/src/lib.rs b/src/lib.rs index ce62d0d311..6c22b96fec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use lemmy_db_schema::{ utils::{build_db_pool, get_database_url, run_migrations}, }; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; -use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS}; +use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS, SYNCHRONOUS_FEDERATION}; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; @@ -139,7 +139,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) .worker_count(settings.worker_count) .retry_count(settings.retry_count) - .debug(cfg!(debug_assertions)) + .debug(*SYNCHRONOUS_FEDERATION) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.pool().clone()))) .build() From db478195aa5575a6d8836d6d91c4649cb79338ce Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 14:43:57 +0000 Subject: [PATCH 6/8] cargo fmt --- crates/api_crud/src/post/create.rs | 2 +- src/api_routes_http.rs | 2 +- src/lib.rs | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 43a1338173..4264c26d4e 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -29,12 +29,12 @@ use lemmy_db_schema::{ use lemmy_db_views_actor::structs::CommunityView; use lemmy_utils::{ error::LemmyError, - SYNCHRONOUS_FEDERATION, spawn_try_task, utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, + SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index 32247b48a4..cb735f807c 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -105,7 +105,7 @@ use lemmy_apub::{ }, SendActivity, }; -use lemmy_utils::{rate_limit::RateLimitCell, SYNCHRONOUS_FEDERATION, spawn_try_task}; +use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION}; use serde::Deserialize; pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { diff --git a/src/lib.rs b/src/lib.rs index f496d8d6ae..b6fd64ec80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,12 @@ use lemmy_db_schema::{ utils::{build_db_pool, get_database_url, run_migrations}, }; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; -use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS, SYNCHRONOUS_FEDERATION}; +use lemmy_utils::{ + error::LemmyError, + rate_limit::RateLimitCell, + settings::SETTINGS, + SYNCHRONOUS_FEDERATION, +}; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; From 990c3d508b03b1363ce55a3e8b5604e79e27df39 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 14:48:07 +0000 Subject: [PATCH 7/8] empty From ef79de74c164bd70e95f3062345f9d3b8bc4411b Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 6 Jul 2023 16:40:53 +0000 Subject: [PATCH 8/8] empty