From 2af98cd0f929e29c02bbb232a5c93cf346fccbc3 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Wed, 5 Feb 2025 16:29:56 +0100 Subject: [PATCH] improve error messages for internal err --- backend/windmill-api/src/ai.rs | 22 ++--- backend/windmill-api/src/apps.rs | 17 ++-- backend/windmill-api/src/capture.rs | 2 +- .../windmill-api/src/concurrency_groups.rs | 4 +- backend/windmill-api/src/embeddings.rs | 4 +- backend/windmill-api/src/flows.rs | 22 ++--- backend/windmill-api/src/folders.rs | 2 +- backend/windmill-api/src/http_triggers.rs | 8 +- backend/windmill-api/src/job_helpers_ee.rs | 10 +- backend/windmill-api/src/jobs.rs | 28 +++--- .../src/postgres_triggers/handler.rs | 6 +- backend/windmill-api/src/raw_apps.rs | 4 +- backend/windmill-api/src/resources.rs | 8 +- backend/windmill-api/src/schedule.rs | 6 +- backend/windmill-api/src/scripts.rs | 14 +-- backend/windmill-api/src/service_logs.rs | 6 +- backend/windmill-api/src/settings.rs | 8 +- backend/windmill-api/src/users.rs | 6 +- backend/windmill-api/src/users_ee.rs | 4 +- backend/windmill-api/src/utils.rs | 4 +- backend/windmill-api/src/variables.rs | 6 +- .../windmill-api/src/websocket_triggers.rs | 2 +- backend/windmill-api/src/workspaces.rs | 16 ++-- backend/windmill-api/src/workspaces_ee.rs | 2 +- backend/windmill-api/src/workspaces_export.rs | 6 +- backend/windmill-common/src/auth.rs | 6 +- backend/windmill-common/src/cache.rs | 16 ++-- backend/windmill-common/src/error.rs | 14 ++- backend/windmill-common/src/flows.rs | 4 +- backend/windmill-common/src/jobs.rs | 2 +- backend/windmill-common/src/s3_helpers.rs | 4 +- backend/windmill-common/src/scripts.rs | 2 +- backend/windmill-common/src/variables.rs | 6 +- backend/windmill-queue/src/jobs.rs | 64 ++++++------- backend/windmill-queue/src/schedule.rs | 2 +- backend/windmill-worker/src/bun_executor.rs | 12 +-- backend/windmill-worker/src/common.rs | 16 ++-- .../windmill-worker/src/dedicated_worker.rs | 4 +- backend/windmill-worker/src/deno_executor.rs | 4 +- backend/windmill-worker/src/mysql_executor.rs | 2 +- .../windmill-worker/src/oracledb_executor.rs | 2 +- .../windmill-worker/src/python_executor.rs | 6 +- backend/windmill-worker/src/worker.rs | 39 ++++---- backend/windmill-worker/src/worker_flow.rs | 91 ++++++++++--------- .../windmill-worker/src/worker_lockfiles.rs | 30 +++--- 45 files changed, 283 insertions(+), 260 deletions(-) diff --git a/backend/windmill-api/src/ai.rs b/backend/windmill-api/src/ai.rs index ba055d27dea60..d77943df9d529 100644 --- a/backend/windmill-api/src/ai.rs +++ b/backend/windmill-api/src/ai.rs @@ -142,7 +142,7 @@ mod openai { tracing::debug!("Adding user to request body"); let mut json_body: HashMap> = serde_json::from_slice(&body) .map_err(|e| { - Error::InternalErr(format!("Failed to parse request body: {}", e)) + Error::internal_err(format!("Failed to parse request body: {}", e)) })?; let user_json_string = serde_json::Value::String(user.unwrap()).to_string(); // makes sure to escape characters @@ -150,12 +150,12 @@ mod openai { json_body.insert( "user".to_string(), RawValue::from_string(user_json_string) - .map_err(|e| Error::InternalErr(format!("Failed to parse user: {}", e)))?, + .map_err(|e| Error::internal_err(format!("Failed to parse user: {}", e)))?, ); body = serde_json::to_vec(&json_body) .map_err(|e| { - Error::InternalErr(format!("Failed to reserialize request body: {}", e)) + Error::internal_err(format!("Failed to reserialize request body: {}", e)) })? .into(); } @@ -204,13 +204,13 @@ mod openai { .send() .await .map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "Failed to get OpenAI credentials using credentials flow: {}", err )) })?; let response = response.json::().await.map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "Failed to parse OpenAI credentials from credentials flow: {}", err )) @@ -220,7 +220,7 @@ mod openai { pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result { let config = serde_json::from_value(resource) - .map_err(|e| Error::InternalErr(format!("validating openai resource {e:#}")))?; + .map_err(|e| Error::internal_err(format!("validating openai resource {e:#}")))?; let mut user = None::; let mut resource = match config { @@ -257,7 +257,7 @@ mod openai { let azure_base_path = if let Some(azure_base_path) = azure_base_path { Some( serde_json::from_value::(azure_base_path).map_err(|e| { - Error::InternalErr(format!("validating openai azure base path {e:#}")) + Error::internal_err(format!("validating openai azure base path {e:#}")) })?, ) } else { @@ -303,7 +303,7 @@ mod anthropic { pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result { let mut resource: AnthropicCache = serde_json::from_value(resource) - .map_err(|e| Error::InternalErr(format!("validating anthropic resource {e:#}")))?; + .map_err(|e| Error::internal_err(format!("validating anthropic resource {e:#}")))?; resource.api_key = get_variable_or_self(resource.api_key, db, w_id).await?; Ok(KeyCache::Anthropic(resource)) } @@ -335,7 +335,7 @@ mod mistral { pub async fn get_cached_value(db: &DB, w_id: &str, resource: Value) -> Result { let mut resource: MistralCache = serde_json::from_value(resource) - .map_err(|e| Error::InternalErr(format!("validating mistral resource {e:#}")))?; + .map_err(|e| Error::internal_err(format!("validating mistral resource {e:#}")))?; resource.api_key = get_variable_or_self(resource.api_key, db, w_id).await?; Ok(KeyCache::Mistral(resource)) } @@ -472,7 +472,7 @@ async fn proxy( .await?; if ai_resource.is_none() { - return Err(Error::InternalErr("AI resource not configured".to_string())); + return Err(Error::internal_err("AI resource not configured".to_string())); } let ai_resource = serde_json::from_value::(ai_resource.unwrap()) @@ -497,7 +497,7 @@ async fn proxy( }; if resource.is_none() { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "{:?} resource missing value", ai_provider ))); diff --git a/backend/windmill-api/src/apps.rs b/backend/windmill-api/src/apps.rs index bf134adc7f65d..bcfc72b4e06df 100644 --- a/backend/windmill-api/src/apps.rs +++ b/backend/windmill-api/src/apps.rs @@ -360,7 +360,7 @@ async fn list_apps( .fields(&["dm.deployment_msg"]); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; let rows = sqlx::query_as::<_, ListableApp>(&sql) .fetch_all(&mut *tx) @@ -611,7 +611,7 @@ async fn get_public_app_by_secret( let decrypted = mc .decrypt_bytes_to_bytes(&(hex::decode(secret)?)) - .map_err(|e| Error::InternalErr(e.to_string()))?; + .map_err(|e| Error::internal_err(e.to_string()))?; let bytes = str::from_utf8(&decrypted).map_err(to_anyhow)?; let id: i64 = bytes.parse().map_err(to_anyhow)?; @@ -958,7 +958,7 @@ async fn delete_app( .execute(&db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}" )) })?; @@ -1061,7 +1061,7 @@ async fn update_app( sqlb.returning("path"); - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let npath_o: Option = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?; not_found_if_none(npath_o, "App", path)? } else { @@ -1710,7 +1710,7 @@ async fn upload_s3_file_from_app( } }; - let s3_resource = s3_resource_opt.ok_or(Error::InternalErr( + let s3_resource = s3_resource_opt.ok_or(Error::internal_err( "No files storage resource defined at the workspace level".to_string(), ))?; let s3_client = build_object_store_client(&s3_resource).await?; @@ -2034,7 +2034,10 @@ async fn build_args( safe_args.insert( k.to_string(), to_raw_value(&value.unwrap_or(Ok(serde_json::Value::Null)).map_err(|e| { - Error::InternalErr(format!("failed to serialize ctx variable for {}: {}", k, e)) + Error::internal_err(format!( + "failed to serialize ctx variable for {}: {}", + k, e + )) })?), ); } else if !arg_str.contains("\"$var:") && !arg_str.contains("\"$res:") { @@ -2054,7 +2057,7 @@ async fn build_args( ), ) .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "failed to remove sensitive variable(s)/resource(s) with error: {}", e )) diff --git a/backend/windmill-api/src/capture.rs b/backend/windmill-api/src/capture.rs index fe752336f3b6c..4b933b01b4225 100644 --- a/backend/windmill-api/src/capture.rs +++ b/backend/windmill-api/src/capture.rs @@ -412,7 +412,7 @@ async fn get_capture_trigger_config_and_owner( Ok(( serde_json::from_str(trigger_config.get()).map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error parsing capture config for {} trigger: {}", kind, e )) diff --git a/backend/windmill-api/src/concurrency_groups.rs b/backend/windmill-api/src/concurrency_groups.rs index 945df75573a04..6a414d0a0cf6a 100644 --- a/backend/windmill-api/src/concurrency_groups.rs +++ b/backend/windmill-api/src/concurrency_groups.rs @@ -16,7 +16,7 @@ use sql_builder::bind::Bind; use sql_builder::SqlBuilder; use uuid::Uuid; use windmill_common::db::UserDB; -use windmill_common::error::Error::{InternalErr, PermissionDenied}; +use windmill_common::error::Error::PermissionDenied; use windmill_common::error::{self, JsonResult}; use windmill_common::utils::require_admin; @@ -82,7 +82,7 @@ async fn prune_concurrency_group( if n_job_uuids > 0 { tx.commit().await?; - return Err(InternalErr( + return Err(error::Error::internal_err( "Concurrency group is currently in use, unable to remove it. Retry later.".to_string(), )); } diff --git a/backend/windmill-api/src/embeddings.rs b/backend/windmill-api/src/embeddings.rs index fd222bdbfaf2f..30415156ced5a 100644 --- a/backend/windmill-api/src/embeddings.rs +++ b/backend/windmill-api/src/embeddings.rs @@ -90,7 +90,7 @@ async fn query_hub_scripts( Ok(Json(results)) } else { - Err(windmill_common::error::Error::InternalErr( + Err(windmill_common::error::Error::internal_err( "Embeddings db not initialized".to_string(), )) } @@ -124,7 +124,7 @@ async fn query_resource_types( Ok(Json(results)) } else { - Err(windmill_common::error::Error::InternalErr( + Err(windmill_common::error::Error::internal_err( "Embeddings db not initialized".to_string(), )) } diff --git a/backend/windmill-api/src/flows.rs b/backend/windmill-api/src/flows.rs index 1b283d43f361e..38ce53069ab31 100644 --- a/backend/windmill-api/src/flows.rs +++ b/backend/windmill-api/src/flows.rs @@ -187,7 +187,7 @@ async fn list_flows( .fields(&["dm.deployment_msg"]); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; let rows = sqlx::query_as::<_, ListableFlow>(&sql) .fetch_all(&mut *tx) @@ -704,7 +704,7 @@ async fn update_flow( w_id, ) .execute(&mut *tx) - .await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to flow update: {e:#}")))?; + .await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to flow update: {e:#}")))?; if is_new_path { // if new path, must clone flow to new path and delete old flow for flow_version foreign key constraint @@ -721,7 +721,7 @@ async fn update_flow( .execute(&mut *tx) .await .map_err(|e| { - error::Error::InternalErr(format!("Error updating flow due to create new flow: {e:#}")) + error::Error::internal_err(format!("Error updating flow due to create new flow: {e:#}")) })?; sqlx::query!( @@ -733,7 +733,7 @@ async fn update_flow( .execute(&mut *tx) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Error updating flow due to updating flow history path: {e:#}" )) })?; @@ -746,7 +746,7 @@ async fn update_flow( .execute(&mut *tx) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Error updating flow due to deleting old flow: {e:#}" )) })?; @@ -781,7 +781,7 @@ async fn update_flow( .fetch_one(&mut *tx) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Error updating flow due to flow history insert: {e:#}" )) })?; @@ -805,7 +805,7 @@ async fn update_flow( .bind(&flow_path) .bind(&w_id) .fetch_all(&mut *tx) - .await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to related schedules update: {e:#}")))?; + .await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to related schedules update: {e:#}")))?; let schedule = sqlx::query_as::<_, Schedule>( "UPDATE schedule SET path = $1, script_path = $1 WHERE path = $2 AND workspace_id = $3 AND is_flow IS true RETURNING *") @@ -813,7 +813,7 @@ async fn update_flow( .bind(&flow_path) .bind(&w_id) .fetch_optional(&mut *tx) - .await.map_err(|e| error::Error::InternalErr(format!("Error updating flow due to related schedule update: {e:#}")))?; + .await.map_err(|e| error::Error::internal_err(format!("Error updating flow due to related schedule update: {e:#}")))?; if let Some(schedule) = schedule { clear_schedule(&mut tx, &flow_path, &w_id).await?; @@ -907,7 +907,7 @@ async fn update_flow( .execute(&mut *new_tx) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Error updating flow due to updating dependency job field: {e:#}" )) })?; @@ -919,7 +919,7 @@ async fn update_flow( .execute(&mut *new_tx) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Error updating flow due to cancelling dependency job: {e:#}" )) })?; @@ -1204,7 +1204,7 @@ async fn delete_flow_by_path( .execute(&db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}" )) })?; diff --git a/backend/windmill-api/src/folders.rs b/backend/windmill-api/src/folders.rs index 208ec6398598e..cab039c64fa14 100644 --- a/backend/windmill-api/src/folders.rs +++ b/backend/windmill-api/src/folders.rs @@ -340,7 +340,7 @@ async fn update_folder( let sql = sqlb .sql() - .map_err(|e| error::Error::InternalErr(e.to_string()))?; + .map_err(|e| error::Error::internal_err(e.to_string()))?; let nfolder = sqlx::query_as::<_, Folder>(&sql) .fetch_optional(&mut *tx) .await?; diff --git a/backend/windmill-api/src/http_triggers.rs b/backend/windmill-api/src/http_triggers.rs index ef62928f2c5c1..d0a0b43698d2b 100644 --- a/backend/windmill-api/src/http_triggers.rs +++ b/backend/windmill-api/src/http_triggers.rs @@ -176,7 +176,7 @@ async fn list_triggers( } let sql = sqlb .sql() - .map_err(|e| error::Error::InternalErr(e.to_string()))?; + .map_err(|e| error::Error::internal_err(e.to_string()))?; let rows = sqlx::query_as::<_, Trigger>(&sql) .fetch_all(&mut *tx) .await?; @@ -590,7 +590,7 @@ async fn route_job( #[cfg(not(feature = "parquet"))] if trigger.static_asset_config.is_some() { - return error::Error::InternalErr( + return error::Error::internal_err( "Static asset configuration is not supported in this build".to_string(), ) .into_response(); @@ -608,14 +608,14 @@ async fn route_job( config.storage, ) .await?; - let s3_resource = s3_resource_opt.ok_or(error::Error::InternalErr( + let s3_resource = s3_resource_opt.ok_or(error::Error::internal_err( "No files storage resource defined at the workspace level".to_string(), ))?; let s3_client = build_object_store_client(&s3_resource).await?; let path = object_store::path::Path::from(config.s3); let s3_object = s3_client.get(&path).await.map_err(|err| { tracing::warn!("Error retrieving file from S3: {:?}", err); - error::Error::InternalErr(format!("Error retrieving file: {}", err.to_string())) + error::Error::internal_err(format!("Error retrieving file: {}", err.to_string())) })?; let mut response_headers = http::HeaderMap::new(); if let Some(ref e_tag) = s3_object.meta.e_tag { diff --git a/backend/windmill-api/src/job_helpers_ee.rs b/backend/windmill-api/src/job_helpers_ee.rs index 7b074a300a2c2..b35f40d661d42 100644 --- a/backend/windmill-api/src/job_helpers_ee.rs +++ b/backend/windmill-api/src/job_helpers_ee.rs @@ -69,7 +69,7 @@ pub async fn get_s3_resource<'c>( _resource_type: Option, _job_id: Option, ) -> error::Result { - Err(error::Error::InternalErr( + Err(error::Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } @@ -81,7 +81,7 @@ pub async fn upload_file_from_req( _req: axum::extract::Request, _options: PutMultipartOpts, ) -> error::Result<()> { - Err(error::Error::InternalErr( + Err(error::Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } @@ -93,7 +93,7 @@ pub async fn upload_file_internal( _stream: impl Stream> + Unpin, _options: PutMultipartOpts, ) -> error::Result<()> { - Err(error::Error::InternalErr( + Err(error::Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } @@ -107,7 +107,7 @@ pub async fn download_s3_file_internal( _w_id: &str, _query: DownloadFileQuery, ) -> error::Result { - Err(error::Error::InternalErr( + Err(error::Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } @@ -120,7 +120,7 @@ pub async fn load_image_preview_internal( _w_id: &str, _query: LoadImagePreviewQuery, ) -> error::Result { - Err(error::Error::InternalErr( + Err(error::Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index f039c220fb85b..4e477ab8914d8 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -384,7 +384,7 @@ async fn cancel_job_api( ) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "timeout after 120s while cancelling job {id} in {w_id}: {e:#}" )) })??; @@ -495,7 +495,7 @@ async fn force_cancel( ) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "timeout after 120s while cancelling job {id} in {w_id}: {e:#}" )) })??; @@ -554,7 +554,7 @@ pub async fn get_path_tag_limits_cache_for_hash( .fetch_optional(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "querying getting path for hash {hash} in {w_id}: {e:#}" )) })?.ok_or_else(|| Error::NotFound(format!( @@ -3682,10 +3682,10 @@ pub async fn run_wait_result( if let Some(windmill_headers) = windmill_headers { for (k, v) in windmill_headers { let k = HeaderName::from_str(k.as_str()).map_err(|err| { - Error::InternalErr(format!("Invalid header name {k}: {err}")) + Error::internal_err(format!("Invalid header name {k}: {err}")) })?; let v = HeaderValue::from_str(v.as_str()).map_err(|err| { - Error::InternalErr(format!("Invalid header value {v}: {err}")) + Error::internal_err(format!("Invalid header value {v}: {err}")) })?; headers.insert(k, v); } @@ -3704,7 +3704,7 @@ pub async fn run_wait_result( headers.insert( http::header::CONTENT_TYPE, HeaderValue::from_str(content_type.as_str()).map_err(|err| { - Error::InternalErr(format!("Invalid content type {content_type}: {err}")) + Error::internal_err(format!("Invalid content type {content_type}: {err}")) })?, ); return Ok((status_code_or_default, headers, serialized_result).into_response()); @@ -3760,7 +3760,7 @@ pub async fn check_queue_too_long(db: &DB, queue_limit: Option) -> error::R .unwrap_or(0); if count > queue_limit.unwrap() { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Number of queued job is too high: {count} > {limit}" ))); } @@ -3872,7 +3872,7 @@ pub async fn run_wait_result_job_by_path_get( return Ok(Json(serde_json::json!("")).into_response()); } let payload_r = run_query.payload.map(decode_payload).map(|x| { - x.map_err(|e| Error::InternalErr(format!("Impossible to decode query payload: {e:#?}"))) + x.map_err(|e| Error::internal_err(format!("Impossible to decode query payload: {e:#?}"))) }); let mut payload_args = if let Some(payload) = payload_r { @@ -3967,7 +3967,7 @@ pub async fn run_wait_result_flow_by_path_get( } let payload_r = run_query.payload.clone().map(decode_payload).map(|x| { x.map_err(|e| { - error::Error::InternalErr(format!("Impossible to decode query payload: {e:#?}")) + error::Error::internal_err(format!("Impossible to decode query payload: {e:#?}")) }) }); @@ -4551,7 +4551,7 @@ async fn run_dependencies_job( } if req.raw_scripts.len() != 1 || req.raw_scripts[0].script_path != req.entrypoint { - return Err(error::Error::InternalErr( + return Err(error::Error::internal_err( "For now only a single raw script can be passed to this endpoint, and the entrypoint should be set to the script path".to_string(), )); } @@ -4790,10 +4790,10 @@ async fn add_batch_jobs( ) .fetch_optional(&mut *tx) .await? - .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; + .ok_or_else(|| Error::internal_err(format!("not found flow at path {:?}", path)))?; let value = serde_json::from_str::(value_json.value.get()).map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "could not convert json to flow for {path}: {err:?}" )) })?; @@ -5138,7 +5138,7 @@ async fn get_log_file(Path((_w_id, file_p)): Path<(String, String)>) -> error::R .unwrap(); return Ok(res); } else { - return Err(error::Error::InternalErr(format!( + return Err(error::Error::internal_err(format!( "Error getting bytes from file: {}", file_p ))); @@ -5150,7 +5150,7 @@ async fn get_log_file(Path((_w_id, file_p)): Path<(String, String)>) -> error::R ))); } } else { - return Err(error::Error::InternalErr(format!( + return Err(error::Error::internal_err(format!( "Object store client not present and file not found on server logs volume at {local_file}" ))); } diff --git a/backend/windmill-api/src/postgres_triggers/handler.rs b/backend/windmill-api/src/postgres_triggers/handler.rs index 5bcfb69acc18e..ad23938e5eb6d 100644 --- a/backend/windmill-api/src/postgres_triggers/handler.rs +++ b/backend/windmill-api/src/postgres_triggers/handler.rs @@ -428,17 +428,17 @@ pub async fn list_postgres_triggers( } let sql = sqlb .sql() - .map_err(|e| error::Error::InternalErr(e.to_string()))?; + .map_err(|e| error::Error::internal_err(e.to_string()))?; let rows = sqlx::query_as::<_, PostgresTrigger>(&sql) .fetch_all(&mut *tx) .await .map_err(|e| { tracing::debug!("Error fetching postgres_trigger: {:#?}", e); - windmill_common::error::Error::InternalErr("server error".to_string()) + windmill_common::error::Error::internal_err("server error".to_string()) })?; tx.commit().await.map_err(|e| { tracing::debug!("Error commiting postgres_trigger: {:#?}", e); - windmill_common::error::Error::InternalErr("server error".to_string()) + windmill_common::error::Error::internal_err("server error".to_string()) })?; Ok(Json(rows)) diff --git a/backend/windmill-api/src/raw_apps.rs b/backend/windmill-api/src/raw_apps.rs index 59f2471c4d170..bb2763b73debd 100644 --- a/backend/windmill-api/src/raw_apps.rs +++ b/backend/windmill-api/src/raw_apps.rs @@ -110,7 +110,7 @@ async fn list_apps( sqlb.and_where_eq("app.path", "?".bind(path_exact)); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; let rows = sqlx::query_as::<_, ListableApp>(&sql) .fetch_all(&mut *tx) @@ -293,7 +293,7 @@ async fn update_app( sqlb.returning("path"); - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let npath_o: Option = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?; not_found_if_none(npath_o, "Raw App", path)?; diff --git a/backend/windmill-api/src/resources.rs b/backend/windmill-api/src/resources.rs index 88f8d79c29e46..564f9977b2f4f 100644 --- a/backend/windmill-api/src/resources.rs +++ b/backend/windmill-api/src/resources.rs @@ -262,7 +262,7 @@ async fn list_resources( sqlb.and_where_like_left("resource.path", path_start); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; let rows = sqlx::query_as::<_, ListableResource>(&sql) .fetch_all(&mut *tx) @@ -516,7 +516,7 @@ pub async fn transform_json_value<'c>( Value::String(y) if y.starts_with("$res:") => { let path = y.strip_prefix("$res:").unwrap(); if path.split("/").count() < 2 { - return Err(Error::InternalErr(format!("Invalid resource path: {path}"))); + return Err(Error::internal_err(format!("Invalid resource path: {path}"))); } let mut tx: Transaction<'_, Postgres> = authed_transaction_or_default(authed, user_db.clone(), db).await?; @@ -815,7 +815,7 @@ async fn update_resource( } } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let npath_o: Option = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?; let npath = not_found_if_none(npath_o, "Resource", path)?; @@ -1165,7 +1165,7 @@ async fn update_resource_type( sqlb.set_str("description", ndesc); } sqlb.set_str("edited_at", "now()"); - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; sqlx::query(&sql).execute(&mut *tx).await?; diff --git a/backend/windmill-api/src/schedule.rs b/backend/windmill-api/src/schedule.rs index 79e8c4dcb49db..5b22499ca9e05 100644 --- a/backend/windmill-api/src/schedule.rs +++ b/backend/windmill-api/src/schedule.rs @@ -196,7 +196,7 @@ async fn create_schedule( .bind(&ns.cron_version.unwrap_or("v2".to_string())) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("inserting schedule in {w_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("inserting schedule in {w_id}: {e:#}")))?; handle_deployment_metadata( &authed.email, @@ -282,7 +282,7 @@ async fn edit_schedule( .bind(&es.cron_version) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("updating schedule in {w_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("updating schedule in {w_id}: {e:#}")))?; handle_deployment_metadata( &authed.email, @@ -356,7 +356,7 @@ async fn list_schedule( if let Some(path_start) = &lsq.path_start { sqlb.and_where_like_left("path", path_start); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let rows = sqlx::query_as::<_, Schedule>(&sql) .fetch_all(&mut *tx) .await?; diff --git a/backend/windmill-api/src/scripts.rs b/backend/windmill-api/src/scripts.rs index 10ac5aa387648..cbeb13e10690b 100644 --- a/backend/windmill-api/src/scripts.rs +++ b/backend/windmill-api/src/scripts.rs @@ -311,7 +311,7 @@ async fn list_scripts( .fields(&["dm.deployment_msg"]); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let mut tx = user_db.begin(&authed).await?; let rows = sqlx::query_as::<_, ListableScript>(&sql) .fetch_all(&mut *tx) @@ -1337,7 +1337,7 @@ async fn archive_script_by_path( ) .fetch_one(&db) .await - .map_err(|e| Error::InternalErr(format!("archiving script in {w_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("archiving script in {w_id}: {e:#}")))?; audit_log( &mut *tx, &authed, @@ -1387,7 +1387,7 @@ async fn archive_script_by_hash( .bind(&hash.0) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("archiving script in {w_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("archiving script in {w_id}: {e:#}")))?; audit_log( &mut *tx, @@ -1427,7 +1427,7 @@ async fn delete_script_by_hash( .bind(&w_id) .fetch_one(&db) .await - .map_err(|e| Error::InternalErr(format!("deleting script by hash {w_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("deleting script by hash {w_id}: {e:#}")))?; audit_log( &mut *tx, @@ -1487,7 +1487,7 @@ async fn delete_script_by_path( ) .fetch_one(&db) .await - .map_err(|e| Error::InternalErr(format!("deleting script by path {w_id}: {e:#}")))? + .map_err(|e| Error::internal_err(format!("deleting script by path {w_id}: {e:#}")))? } else { // If the script is draft only, we can delete it without admin permissions but we still need write permissions sqlx::query_scalar!( @@ -1497,7 +1497,7 @@ async fn delete_script_by_path( ) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("deleting script by path {w_id}: {e:#}")))? + .map_err(|e| Error::internal_err(format!("deleting script by path {w_id}: {e:#}")))? }; sqlx::query!( @@ -1559,7 +1559,7 @@ async fn delete_script_by_path( .execute(&db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error deleting deployment metadata for script with path {path} in workspace {w_id}: {e:#}" )) })?; diff --git a/backend/windmill-api/src/service_logs.rs b/backend/windmill-api/src/service_logs.rs index 277a08bca99f5..b11646fbe05cb 100644 --- a/backend/windmill-api/src/service_logs.rs +++ b/backend/windmill-api/src/service_logs.rs @@ -83,7 +83,7 @@ async fn list_files( if let Some(true) = lq.with_error { sqlb.and_where("err_lines > 0"); } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let rows = sqlx::query_as::<_, LogFile>(&sql).fetch_all(&db).await?; Ok(Json(rows)) } @@ -114,7 +114,7 @@ async fn get_log_file( return Ok(content_plain(Body::from(bytes::Bytes::from(bytes)))); } Err(e) => { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Error pulling the bytes: {}", e ))); @@ -122,7 +122,7 @@ async fn get_log_file( } } Err(e) => { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Error fetching the file: {}", e ))); diff --git a/backend/windmill-api/src/settings.rs b/backend/windmill-api/src/settings.rs index c527eb481a2fd..37a33ee81ac80 100644 --- a/backend/windmill-api/src/settings.rs +++ b/backend/windmill-api/src/settings.rs @@ -132,7 +132,7 @@ pub async fn test_s3_bucket( if first_file.is_some() { if let Err(e) = first_file.as_ref().unwrap() { tracing::error!("error listing bucket: {e:#}"); - error::Error::InternalErr(format!("Failed to list files in blob storage: {e:#}")); + error::Error::internal_err(format!("Failed to list files in blob storage: {e:#}")); } tracing::info!("Listed files: {:?}", first_file.unwrap()); } else { @@ -156,7 +156,7 @@ pub async fn test_s3_bucket( .await .map_err(to_anyhow)?; if content != Bytes::from_static(b"hello") { - return Err(error::Error::InternalErr( + return Err(error::Error::internal_err( "Failed to read back from blob storage".to_string(), )); } @@ -232,7 +232,7 @@ pub async fn set_global_setting_internal( generate_instance_username_for_all_users(db) .await .map_err(|err| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Failed to generate instance wide usernames: {}", err )) @@ -349,7 +349,7 @@ pub async fn get_latest_key_renewal_attempt( Some(last_attempt) => { let last_attempt_result = serde_json::from_value::(last_attempt.value) .map_err(|e| { - error::Error::InternalErr(format!("Failed to parse last attempt: {}", e)) + error::Error::internal_err(format!("Failed to parse last attempt: {}", e)) })?; Ok(Json(Some(KeyRenewalAttempt { result: last_attempt_result, diff --git a/backend/windmill-api/src/users.rs b/backend/windmill-api/src/users.rs index c0159480e7eb0..2424c5e35f670 100644 --- a/backend/windmill-api/src/users.rs +++ b/backend/windmill-api/src/users.rs @@ -487,7 +487,7 @@ async fn list_user_usage( .fetch_all(&mut *tx), ) .await - .map_err(|e| Error::InternalErr(format!("Timed out while fetching user usage: {e:#}")))??; + .map_err(|e| Error::internal_err(format!("Timed out while fetching user usage: {e:#}")))??; tx.commit().await?; Ok(Json(rows)) } @@ -695,7 +695,7 @@ async fn global_whoami( ) .fetch_one(&db) .await - .map_err(|e| Error::InternalErr(format!("fetching global identity: {e:#}"))); + .map_err(|e| Error::internal_err(format!("fetching global identity: {e:#}"))); if let Ok(user) = user { Ok(Json(user)) @@ -1591,7 +1591,7 @@ async fn login( if let Some((email, hash, super_admin, first_time_user)) = email_w_h { let parsed_hash = - PasswordHash::new(&hash).map_err(|e| Error::InternalErr(e.to_string()))?; + PasswordHash::new(&hash).map_err(|e| Error::internal_err(e.to_string()))?; if argon2 .verify_password(password.as_bytes(), &parsed_hash) .is_err() diff --git a/backend/windmill-api/src/users_ee.rs b/backend/windmill-api/src/users_ee.rs index 0e20d5bf841cd..7a11239a2ff1d 100644 --- a/backend/windmill-api/src/users_ee.rs +++ b/backend/windmill-api/src/users_ee.rs @@ -17,7 +17,7 @@ pub async fn create_user( _argon2: Arc>, mut _nu: NewUser, ) -> Result<(StatusCode, String)> { - Err(Error::InternalErr( + Err(Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } @@ -29,7 +29,7 @@ pub async fn set_password( _user_email: &str, _ep: EditPassword, ) -> Result { - Err(Error::InternalErr( + Err(Error::internal_err( "Not implemented in Windmill's Open Source repository".to_string(), )) } diff --git a/backend/windmill-api/src/utils.rs b/backend/windmill-api/src/utils.rs index 5e7adde1e65d3..439944e2b229f 100644 --- a/backend/windmill-api/src/utils.rs +++ b/backend/windmill-api/src/utils.rs @@ -76,7 +76,7 @@ pub async fn generate_instance_wide_unique_username<'c>( let mut i = 1; while username_conflict { if i > 1000 { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "too many username conflicts for {}", email ))); @@ -168,7 +168,7 @@ pub async fn get_instance_username_or_create_pending<'c>( ) .execute(&mut **tx) .await - .map_err(|e| Error::InternalErr(format!("creating pending user: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("creating pending user: {e:#}")))?; Ok(username) } diff --git a/backend/windmill-api/src/variables.rs b/backend/windmill-api/src/variables.rs index 48800621b6efb..37960e8583cf6 100644 --- a/backend/windmill-api/src/variables.rs +++ b/backend/windmill-api/src/variables.rs @@ -196,7 +196,7 @@ async fn get_variable( ) } #[cfg(not(feature = "oauth2"))] - return Err(Error::InternalErr("Require oauth2 feature".to_string())); + return Err(Error::internal_err("Require oauth2 feature".to_string())); } else if !value.is_empty() && decrypt_secret { let _ = tx.commit().await; let mc = build_crypt(&db, &w_id).await?; @@ -558,7 +558,7 @@ async fn update_variable( } } - let sql = sqlb.sql().map_err(|e| Error::InternalErr(e.to_string()))?; + let sql = sqlb.sql().map_err(|e| Error::internal_err(e.to_string()))?; let npath_o: Option = sqlx::query_scalar(&sql).fetch_optional(&mut *tx).await?; @@ -662,7 +662,7 @@ pub async fn get_value_internal<'c>( .await? } #[cfg(not(feature = "oauth2"))] - return Err(Error::InternalErr("Require oauth2 feature".to_string())); + return Err(Error::internal_err("Require oauth2 feature".to_string())); } else if !value.is_empty() { tx.commit().await?; let mc = build_crypt(&db, &w_id).await?; diff --git a/backend/windmill-api/src/websocket_triggers.rs b/backend/windmill-api/src/websocket_triggers.rs index f90bb8def0f51..a28ed014254a2 100644 --- a/backend/windmill-api/src/websocket_triggers.rs +++ b/backend/windmill-api/src/websocket_triggers.rs @@ -154,7 +154,7 @@ async fn list_websocket_triggers( } let sql = sqlb .sql() - .map_err(|e| error::Error::InternalErr(e.to_string()))?; + .map_err(|e| error::Error::internal_err(e.to_string()))?; let rows = sqlx::query_as::<_, WebsocketTrigger>(&sql) .fetch_all(&mut *tx) .await?; diff --git a/backend/windmill-api/src/workspaces.rs b/backend/windmill-api/src/workspaces.rs index 3b9fcc7049f43..af074c2d81806 100644 --- a/backend/windmill-api/src/workspaces.rs +++ b/backend/windmill-api/src/workspaces.rs @@ -412,7 +412,7 @@ async fn get_settings( ) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("getting settings: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("getting settings: {e:#}")))?; tx.commit().await?; Ok(Json(settings)) @@ -435,7 +435,7 @@ async fn get_deploy_to( ) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("getting deploy_to: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("getting deploy_to: {e:#}")))?; tx.commit().await?; Ok(Json(settings)) @@ -744,7 +744,7 @@ async fn get_copilot_info( ) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("getting ai_resource and code_completion_model: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("getting ai_resource and code_completion_model: {e:#}")))?; tx.commit().await?; let (ai_provider, exists_ai_resource) = if let Some(ai_resource) = record.ai_resource { @@ -788,7 +788,7 @@ async fn edit_large_file_storage_config( if let Some(lfs_config) = new_config.large_file_storage { let serialized_lfs_config = serde_json::to_value::(lfs_config) - .map_err(|err| Error::InternalErr(err.to_string()))?; + .map_err(|err| Error::internal_err(err.to_string()))?; sqlx::query!( "UPDATE workspace_settings SET large_file_storage = $1 WHERE workspace_id = $2", @@ -857,7 +857,7 @@ async fn edit_git_sync_config( if let Some(git_sync_settings) = new_config.git_sync_settings { let serialized_config = serde_json::to_value::(git_sync_settings) - .map_err(|err| Error::InternalErr(err.to_string()))?; + .map_err(|err| Error::internal_err(err.to_string()))?; sqlx::query!( "UPDATE workspace_settings SET git_sync = $1 WHERE workspace_id = $2", @@ -923,7 +923,7 @@ async fn edit_deploy_ui_config( if let Some(deploy_ui_settings) = new_config.deploy_ui_settings { let serialized_config = serde_json::to_value::(deploy_ui_settings) - .map_err(|err| Error::InternalErr(err.to_string()))?; + .map_err(|err| Error::internal_err(err.to_string()))?; sqlx::query!( "UPDATE workspace_settings SET deploy_ui = $1 WHERE workspace_id = $2", @@ -1018,7 +1018,7 @@ async fn get_default_scripts( ) .fetch_optional(&mut *tx) .await - .map_err(|err| Error::InternalErr(format!("getting default_app: {err}")))?; + .map_err(|err| Error::internal_err(format!("getting default_app: {err}")))?; tx.commit().await?; Ok(Json(default_scripts.flatten())) @@ -1092,7 +1092,7 @@ async fn get_default_app( ) .fetch_one(&mut *tx) .await - .map_err(|err| Error::InternalErr(format!("getting default_app: {err}")))?; + .map_err(|err| Error::internal_err(format!("getting default_app: {err}")))?; tx.commit().await?; Ok(Json(WorkspaceDefaultApp { default_app_path })) diff --git a/backend/windmill-api/src/workspaces_ee.rs b/backend/windmill-api/src/workspaces_ee.rs index d5fd2cdda1cec..aa8799e233a0a 100644 --- a/backend/windmill-api/src/workspaces_ee.rs +++ b/backend/windmill-api/src/workspaces_ee.rs @@ -9,7 +9,7 @@ pub async fn edit_auto_invite( _w_id: String, _ea: EditAutoInvite, ) -> windmill_common::error::Result { - Err(windmill_common::error::Error::InternalErr( + Err(windmill_common::error::Error::internal_err( "Not implemented on OSS".to_string(), )) } diff --git a/backend/windmill-api/src/workspaces_export.rs b/backend/windmill-api/src/workspaces_export.rs index 8e7783d8c0252..1f55d0172a229 100644 --- a/backend/windmill-api/src/workspaces_export.rs +++ b/backend/windmill-api/src/workspaces_export.rs @@ -560,7 +560,7 @@ pub(crate) async fn tarball_workspace( for group in groups { let extra_perms: HashMap = serde_json::from_value(group.extra_perms) .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error parsing extra_perms for group {}: {}", group.name, e )) @@ -638,7 +638,7 @@ pub(crate) async fn tarball_workspace( .map(|v| serde_json::to_string_pretty(&v).ok()) .ok() .flatten() - .ok_or_else(|| Error::InternalErr("Error serializing settings".to_string()))?; + .ok_or_else(|| Error::internal_err("Error serializing settings".to_string()))?; archive .write_to_archive(&settings_str, "settings.json") @@ -657,7 +657,7 @@ pub(crate) async fn tarball_workspace( .map(|v| serde_json::to_string_pretty(&v).ok()) .ok() .flatten() - .ok_or_else(|| Error::InternalErr("Error serializing enryption key".to_string()))?; + .ok_or_else(|| Error::internal_err("Error serializing enryption key".to_string()))?; archive .write_to_archive(&key_json, "encryption_key.json") .await?; diff --git a/backend/windmill-common/src/auth.rs b/backend/windmill-common/src/auth.rs index 017131388dd75..52a966b105a1f 100644 --- a/backend/windmill-common/src/auth.rs +++ b/backend/windmill-common/src/auth.rs @@ -67,7 +67,7 @@ pub async fn is_super_admin_email(db: &DB, email: &str) -> Result { let is_admin = sqlx::query_scalar!("SELECT super_admin FROM password WHERE email = $1", email) .fetch_optional(db) .await - .map_err(|e| Error::InternalErr(format!("fetching super admin: {e:#}")))? + .map_err(|e| Error::internal_err(format!("fetching super admin: {e:#}")))? .unwrap_or(false); Ok(is_admin) @@ -81,7 +81,7 @@ pub async fn is_devops_email(db: &DB, email: &str) -> Result { let is_devops = sqlx::query_scalar!("SELECT devops FROM password WHERE email = $1", email) .fetch_optional(db) .await - .map_err(|e| Error::InternalErr(format!("fetching super admin: {e:#}")))? + .map_err(|e| Error::internal_err(format!("fetching super admin: {e:#}")))? .unwrap_or(false); Ok(is_devops) @@ -123,7 +123,7 @@ pub async fn fetch_authed_from_permissioned_as( if let Some(r) = r { (r.is_admin, r.operator) } else { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "user {name} not found in workspace {w_id}" ))); } diff --git a/backend/windmill-common/src/cache.rs b/backend/windmill-common/src/cache.rs index 83cec0b417462..5a65b35610d23 100644 --- a/backend/windmill-common/src/cache.rs +++ b/backend/windmill-common/src/cache.rs @@ -389,7 +389,7 @@ pub mod flow { async move { fetch_node.await.and_then(|data| match data { RawData::Script(data) => Ok(data), - RawData::Flow(_) => Err(error::Error::InternalErr(format!( + RawData::Flow(_) => Err(error::Error::internal_err(format!( "Flow node ({:x}) isn't a script node.", node.0 ))), @@ -410,7 +410,7 @@ pub mod flow { async move { fetch_node.await.and_then(|data| match data { RawData::Flow(data) => Ok(data), - RawData::Script(_) => Err(error::Error::InternalErr(format!( + RawData::Script(_) => Err(error::Error::internal_err(format!( "Flow node ({:x}) isn't a flow node.", node.0 ))), @@ -639,7 +639,7 @@ pub mod job { async move { fetch_preview.await.and_then(|data| match data { RawData::Flow(data) => Ok(data), - RawData::Script(_) => Err(error::Error::InternalErr(format!( + RawData::Script(_) => Err(error::Error::internal_err(format!( "Job ({job}) isn't a flow job." ))), }) @@ -659,7 +659,7 @@ pub mod job { async move { fetch_preview.await.and_then(|data| match data { RawData::Script(data) => Ok(data), - RawData::Flow(_) => Err(error::Error::InternalErr(format!( + RawData::Flow(_) => Err(error::Error::internal_err(format!( "Job ({job}) isn't a script job." ))), }) @@ -723,7 +723,7 @@ pub mod job { .await .map(|(data, _meta)| data), (AppScript, Some(id)) => app::fetch_script(e, AppScriptId(id)).await, - _ => Err(error::Error::InternalErr(format!( + _ => Err(error::Error::internal_err(format!( "Isn't a script job: {:?}", kind ))), @@ -748,7 +748,7 @@ pub mod job { Ok(raw_flow) => Ok(raw_flow), Err(_) => flow::fetch_version(e, id).await, }, - _ => Err(error::Error::InternalErr(format!( + _ => Err(error::Error::internal_err(format!( "Isn't a flow job {:?}", kind ))), @@ -807,7 +807,7 @@ const _: () = { fn resolve(mut src: Self::Untrusted) -> error::Result { let Some(meta) = src.meta.take() else { - return Err(error::Error::InternalErr("Invalid script src".to_string())); + return Err(error::Error::internal_err("Invalid script src".to_string())); }; Ok(ScriptFull { data: Arc::new(ScriptData { code: src.content, lock: src.lock }), @@ -842,7 +842,7 @@ const _: () = { RawNode { raw_code: Some(code), raw_lock: lock, .. } => { Ok(Self::Script(Arc::new(ScriptData { code, lock }))) } - _ => Err(error::Error::InternalErr( + _ => Err(error::Error::internal_err( "Invalid raw data src".to_string(), )), } diff --git a/backend/windmill-common/src/error.rs b/backend/windmill-common/src/error.rs index 2fe00bc4ab1a6..3e3813e9eb67d 100644 --- a/backend/windmill-common/src/error.rs +++ b/backend/windmill-common/src/error.rs @@ -54,6 +54,8 @@ pub enum Error { QuotaExceeded(String), #[error("Internal: {0}")] InternalErr(String), + #[error("Internal: {message} @{location}")] + InternalErrLoc { message: String, location: String }, #[error("Internal: {0}: {1}")] InternalErrAt(&'static Location<'static>, String), #[error("HexErr: {error:#} @{location:#}")] @@ -143,10 +145,20 @@ impl Error { pub fn relocate_internal(self, loc: &'static Location<'static>) -> Self { match self { - Self::InternalErr(s) | Self::InternalErrAt(_, s) => Self::InternalErrAt(loc, s), + Self::InternalErrLoc { message, .. } | Self::InternalErrAt(_, message) => { + Self::InternalErrAt(loc, message) + } _ => self, } } + + #[track_caller] + pub fn internal_err>(msg: T) -> Self { + Self::InternalErrLoc { + message: msg.as_ref().to_string(), + location: prettify_location(std::panic::Location::caller()), + } + } } pub fn relocate_internal(loc: &'static Location<'static>) -> impl FnOnce(Error) -> Error { diff --git a/backend/windmill-common/src/flows.rs b/backend/windmill-common/src/flows.rs index b4e23bb342a48..31152aa9383ef 100644 --- a/backend/windmill-common/src/flows.rs +++ b/backend/windmill-common/src/flows.rs @@ -736,7 +736,7 @@ pub async fn resolve_value( with_code: bool, ) -> Result<(), Error> { let mut val = serde_json::from_str::(value.get()).map_err(|err| { - Error::InternalErr(format!("resolve: Failed to parse flow value: {}", err)) + Error::internal_err(format!("resolve: Failed to parse flow value: {}", err)) })?; for module in &mut val.modules { resolve_module(e, workspace_id, &mut module.value, with_code).await?; @@ -755,7 +755,7 @@ pub async fn resolve_module( use FlowModuleValue::*; let mut val = serde_json::from_str::(value.get()).map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "resolve: Failed to parse flow module value: {}", err )) diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 13a8dd7e8ae6d..326ba08688bd0 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -480,7 +480,7 @@ pub async fn script_hash_to_tag_and_limits<'c>( .fetch_one(&mut **db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "querying getting tag for hash {script_hash}: {e:#}" )) })?; diff --git a/backend/windmill-common/src/s3_helpers.rs b/backend/windmill-common/src/s3_helpers.rs index 0274d0f38d362..01507f5b6a6ca 100644 --- a/backend/windmill-common/src/s3_helpers.rs +++ b/backend/windmill-common/src/s3_helpers.rs @@ -298,7 +298,7 @@ pub async fn build_s3_client(s3_resource_ref: &S3Resource) -> error::Result crate::error::Result String { pub fn decrypt(mc: &MagicCrypt256, value: String) -> error::Result { mc.decrypt_base64_to_string(value).map_err(|e| match e { - MagicCryptError::DecryptError(_) => error::Error::InternalErr( + MagicCryptError::DecryptError(_) => error::Error::internal_err( "Could not decrypt value. The value may have been encrypted with a different key." .to_string(), ), - _ => error::Error::InternalErr(e.to_string()), + _ => error::Error::internal_err(e.to_string()), }) } diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 2405be5b613a1..8c8b5e49572c6 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -528,7 +528,7 @@ pub async fn add_completed_job( // add_time!(bench, "add_completed_job start"); if !result.is_valid_json() { - return Err(Error::InternalErr( + return Err(Error::internal_err( "Result of job is invalid json (empty)".to_string(), )); } @@ -643,7 +643,7 @@ pub async fn add_completed_job( ) .fetch_one(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("Could not add completed job {job_id}: {e:#}")))?; if !queued_job.is_flow_step { @@ -835,7 +835,7 @@ pub async fn add_completed_job( .execute(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error updating to add ended_at timestamp concurrency_key={concurrency_key}: {e:#}" )) }) { @@ -898,7 +898,7 @@ pub async fn add_completed_job( sqlx::query_scalar!("SELECT premium FROM workspace WHERE id = $1", w_id) .fetch_one(db) .await - .map_err(|e| Error::InternalErr(format!("fetching if {w_id} is premium: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("fetching if {w_id} is premium: {e:#}")))?; let _ = sqlx::query!( "INSERT INTO usage (id, is_workspace, month_, usage) VALUES ($1, TRUE, EXTRACT(YEAR FROM current_date) * 12 + EXTRACT(MONTH FROM current_date), $2) @@ -907,7 +907,7 @@ pub async fn add_completed_job( additional_usage as i32) .execute(db) .await - .map_err(|e| Error::InternalErr(format!("updating usage: {e:#}"))); + .map_err(|e| Error::internal_err(format!("updating usage: {e:#}"))); if !premium_workspace { let _ = sqlx::query!( @@ -918,7 +918,7 @@ pub async fn add_completed_job( additional_usage as i32) .execute(db) .await - .map_err(|e| Error::InternalErr(format!("updating usage: {e:#}"))); + .map_err(|e| Error::internal_err(format!("updating usage: {e:#}"))); } } @@ -1213,7 +1213,7 @@ pub async fn send_error_to_workspace_handler<'a, 'c, T: Serialize + Send + Sync> .fetch_optional(db) .await .context("fetching error handler info from workspace_settings")? - .ok_or_else(|| Error::InternalErr(format!("no workspace settings for id {w_id}")))?; + .ok_or_else(|| Error::internal_err(format!("no workspace settings for id {w_id}")))?; if is_canceled && error_handler_muted_on_cancel { return Ok(()); @@ -1291,7 +1291,7 @@ pub async fn handle_maybe_scheduled_job<'c>( tx.commit().await?; Ok::<(), Error>(()) }) - .map_err(|e| Error::InternalErr(format!("Pushing next scheduled job timedout: {e:#}"))) + .map_err(|e| Error::internal_err(format!("Pushing next scheduled job timedout: {e:#}"))) .unwrap_or_else(|e| Err(e)) }) .retry( @@ -1901,7 +1901,7 @@ pub async fn pull( .fetch_one(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error getting concurrency count for script path {job_script_path}: {e:#}" )) })?; @@ -1912,7 +1912,7 @@ pub async fn pull( job_concurrency_key, f64::from(job_custom_concurrency_time_window_s), ).fetch_one(&mut *tx).await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error getting completed count for key {job_concurrency_key}: {e:#}" )) })?; @@ -1928,7 +1928,7 @@ pub async fn pull( .fetch_one(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error getting concurrency count for script path {job_script_path}: {e:#}" )) })?; @@ -1956,7 +1956,7 @@ pub async fn pull( .fetch_one(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Error decreasing concurrency count for script path {job_script_path}: {e:#}" )) })?; @@ -2033,7 +2033,7 @@ pub async fn pull( ) .fetch_all(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("Could not update and re-queue job {job_uuid}. The job will be marked as running but it is not running: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("Could not update and re-queue job {job_uuid}. The job will be marked as running but it is not running: {e:#}")))?; tx.commit().await? } } @@ -2333,7 +2333,7 @@ pub async fn get_result_and_success_by_id_from_flow( .fetch_optional(db) .await? .ok_or_else(|| { - error::Error::InternalErr(format!("Could not get success from flow job status")) + error::Error::internal_err(format!("Could not get success from flow job status")) })? } }; @@ -2410,7 +2410,7 @@ async fn get_completed_flow_node_result_rec( ) -> error::Result> { for subflow in subflows { let flow_status = subflow.parse_flow_status().ok_or_else(|| { - error::Error::InternalErr(format!("Could not parse flow status of {}", subflow.id)) + error::Error::internal_err(format!("Could not parse flow status of {}", subflow.id)) })?; if let Some(node_status) = flow_status @@ -2761,7 +2761,7 @@ pub async fn push<'c, 'd>( .fetch_one(_db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "fetching if {workspace_id} is premium and overquota: {e:#}" )) })?; @@ -2780,7 +2780,7 @@ pub async fn push<'c, 'd>( ) .fetch_one(_db) .await - .map_err(|e| Error::InternalErr(format!("updating usage: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("updating usage: {e:#}")))?; let user_usage = if !premium_workspace { Some(sqlx::query_scalar!( @@ -2792,7 +2792,7 @@ pub async fn push<'c, 'd>( ) .fetch_one(_db) .await - .map_err(|e| Error::InternalErr(format!("updating usage: {e:#}")))?) + .map_err(|e| Error::internal_err(format!("updating usage: {e:#}")))?) } else { None }; @@ -3344,7 +3344,7 @@ pub async fn push<'c, 'd>( ) .fetch_optional(&mut *ntx) .await? - .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; + .ok_or_else(|| Error::internal_err(format!("not found flow at path {:?}", path)))?; // Do not use the lite version unless all workers are updated. let data = if *DISABLE_FLOW_SCRIPT @@ -3683,12 +3683,12 @@ pub async fn push<'c, 'd>( ) .execute(&mut *tx) .await - .map_err(|e| Error::InternalErr(format!("Could not insert concurrency_key={concurrency_key} for job_id={job_id} script_path={script_path:?} workspace_id={workspace_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("Could not insert concurrency_key={concurrency_key} for job_id={job_id} script_path={script_path:?} workspace_id={workspace_id}: {e:#}")))?; } let stringified_args = if *JOB_ARGS_AUDIT_LOGS { Some(serde_json::to_string(&args).map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Could not serialize args for audit log of job {job_id}: {e:#}" )) })?) @@ -3762,7 +3762,7 @@ pub async fn push<'c, 'd>( .fetch_one(&mut *tx) .warn_after_seconds(1) .await - .map_err(|e| Error::InternalErr(format!("Could not insert into queue {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("Could not insert into queue {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}: {e:#}")))?; tracing::debug!("Pushed {job_id}"); // TODO: technically the job isn't queued yet, as the transaction can be rolled back. Should be solved when moving these metrics to the queue abstraction. @@ -3791,7 +3791,7 @@ pub async fn push<'c, 'd>( ) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Could not get permissions directly for job {job_id}: {e:#}" )) })? @@ -3922,7 +3922,7 @@ async fn restarted_flows_resolution( .fetch_one(db) // TODO: should we try to use the passed-in `tx` here? .await .map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "completed job not found for UUID {} in workspace {}: {}", completed_flow_id, workspace_id, err )) @@ -3936,7 +3936,7 @@ async fn restarted_flows_resolution( .flow_status .as_ref() .and_then(|v| serde_json::from_str::(v.get()).ok()) - .ok_or(Error::InternalErr(format!( + .ok_or(Error::internal_err(format!( "Unable to parse flow status for job {} in workspace {}", completed_flow_id, workspace_id, )))?; @@ -3966,14 +3966,14 @@ async fn restarted_flows_resolution( match module_definition.get_value() { Ok(FlowModuleValue::BranchAll { branches, parallel, .. }) => { if parallel { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Module {} is a parallel branchall. It can only be restarted at a given branch if it's sequential", restart_step_id, ))); } let total_branch_number = module.flow_jobs().map(|v| v.len()).unwrap_or(0); if total_branch_number <= branch_or_iteration_n { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Branch-all module {} has only {} branches. It can't be restarted on branch {}", restart_step_id, total_branch_number, @@ -4004,14 +4004,14 @@ async fn restarted_flows_resolution( } Ok(FlowModuleValue::ForloopFlow { parallel, .. }) => { if parallel { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Module {} is not parallel loop. It can only be restarted at a given iteration if it's sequential", restart_step_id, ))); } let total_iterations = module.flow_jobs().map(|v| v.len()).unwrap_or(0); if total_iterations <= branch_or_iteration_n { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "For-loop module {} doesn't cannot be restarted on iteration number {} as it has only {} iterations", restart_step_id, branch_or_iteration_n, @@ -4041,7 +4041,7 @@ async fn restarted_flows_resolution( }); } _ => { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Module {} is not a branchall or forloop, unable to restart it at step {:?}", restart_step_id, branch_or_iteration_n @@ -4057,7 +4057,7 @@ async fn restarted_flows_resolution( step_n = step_n + 1; match module.clone() { FlowStatusModule::Success { .. } => Ok(truncated_modules.push(module)), - _ => Err(Error::InternalErr(format!( + _ => Err(Error::internal_err(format!( "Flow cannot be restarted from a non successful module", ))), }?; @@ -4066,7 +4066,7 @@ async fn restarted_flows_resolution( if !dependent_module { // step not found in flow. - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Flow cannot be restarted from step {} as it could not be found.", restart_step_id ))); diff --git a/backend/windmill-queue/src/schedule.rs b/backend/windmill-queue/src/schedule.rs index ea9f476ab6538..aa3cf5984be1f 100644 --- a/backend/windmill-queue/src/schedule.rs +++ b/backend/windmill-queue/src/schedule.rs @@ -149,7 +149,7 @@ pub async fn push_scheduled_job<'c>( if schedule.retry.is_some() { let parsed_retry = serde_json::from_value::(schedule.retry.clone().unwrap()) .map_err(|err| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "Unable to parse retry information from schedule: {}", err.to_string(), )) diff --git a/backend/windmill-worker/src/bun_executor.rs b/backend/windmill-worker/src/bun_executor.rs index b810c888fcd98..9d914f5381e38 100644 --- a/backend/windmill-worker/src/bun_executor.rs +++ b/backend/windmill-worker/src/bun_executor.rs @@ -532,7 +532,7 @@ pub async fn generate_wrapper_mjs( format!("{job_dir}/wrapper.js"), format!("{job_dir}/wrapper.mjs"), ) - .map_err(|e| error::Error::InternalErr(format!("Could not move wrapper to mjs: {e:#}")))?; + .map_err(|e| error::Error::internal_err(format!("Could not move wrapper to mjs: {e:#}")))?; Ok(()) } @@ -674,7 +674,7 @@ pub fn copy_recursively( stack.push((entry.path(), destination, level + 1)); } else { fs::hard_link(&original, &destination).map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "hard linking from {original:?} to {destination:?}: {e:#}" )) })?; @@ -823,7 +823,7 @@ async fn write_lock(splitted_lockb_2: &str, job_dir: &str, is_binary: bool) -> R "bun.lockb", &base64::engine::general_purpose::STANDARD .decode(splitted_lockb_2) - .map_err(|_| error::Error::InternalErr(format!("Could not decode bun.lockb")))?, + .map_err(|_| error::Error::internal_err(format!("Could not decode bun.lockb")))?, ) .await?; } else { @@ -1503,13 +1503,13 @@ try {{ let args = read_file(&format!("{job_dir}/args.json")) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while reading args from preprocessing: {e:#}" )) })?; let args: HashMap> = serde_json::from_str(args.get()).map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while deserializing args from preprocessing: {e:#}" )) })?; @@ -1632,7 +1632,7 @@ pub async fn start_worker( &base64::engine::general_purpose::STANDARD .decode(lock) .map_err(|_| { - error::Error::InternalErr("Could not decode bun.lockb".to_string()) + error::Error::internal_err("Could not decode bun.lockb".to_string()) })?, ) .await?; diff --git a/backend/windmill-worker/src/common.rs b/backend/windmill-worker/src/common.rs index 5212d80e11c8b..8c78287be9fdb 100644 --- a/backend/windmill-worker/src/common.rs +++ b/backend/windmill-worker/src/common.rs @@ -149,13 +149,13 @@ pub async fn transform_json<'a>( let inner_vs = v.get(); if (*RE_RES_VAR).is_match(inner_vs) { let value = serde_json::from_str(inner_vs).map_err(|e| { - error::Error::InternalErr(format!("Error while parsing inner arg: {e:#}")) + error::Error::internal_err(format!("Error while parsing inner arg: {e:#}")) })?; let transformed = transform_json_value(&k, &client.get_authed().await, workspace, value, job, db) .await?; let as_raw = serde_json::from_value(transformed).map_err(|e| { - error::Error::InternalErr(format!("Error while parsing inner arg: {e:#}")) + error::Error::internal_err(format!("Error while parsing inner arg: {e:#}")) })?; r.insert(k.to_string(), as_raw); } else { @@ -177,13 +177,13 @@ pub async fn transform_json_as_values<'a>( let inner_vs = v.get(); if (*RE_RES_VAR).is_match(inner_vs) { let value = serde_json::from_str(inner_vs).map_err(|e| { - error::Error::InternalErr(format!("Error while parsing inner arg: {e:#}")) + error::Error::internal_err(format!("Error while parsing inner arg: {e:#}")) })?; let transformed = transform_json_value(&k, &client.get_authed().await, workspace, value, job, db) .await?; let as_raw = serde_json::from_value(transformed).map_err(|e| { - error::Error::InternalErr(format!("Error while parsing inner arg: {e:#}")) + error::Error::internal_err(format!("Error while parsing inner arg: {e:#}")) })?; r.insert(k.to_string(), as_raw); } else { @@ -237,7 +237,7 @@ pub async fn transform_json_value( Value::String(y) if y.starts_with("$res:") => { let path = y.strip_prefix("$res:").unwrap(); if path.split("/").count() < 2 { - return Err(Error::InternalErr(format!( + return Err(Error::internal_err(format!( "Argument `{name}` is an invalid resource path: {path}", ))); } @@ -256,7 +256,7 @@ pub async fn transform_json_value( let mc = build_crypt_with_key_suffix(&db, &job.workspace_id, &job.id.to_string()).await?; decrypt(&mc, encrypted.to_string()).and_then(|x| { - serde_json::from_str(&x).map_err(|e| Error::InternalErr(e.to_string())) + serde_json::from_str(&x).map_err(|e| Error::internal_err(e.to_string())) }) // let path = y.strip_prefix("$res:").unwrap(); @@ -930,7 +930,7 @@ fn tentatively_improve_error(err: Error, executable: &str) -> Error { let err_msg = "program not found"; if err.to_string().contains(&err_msg) { - return Error::InternalErr(format!( + return Error::internal_err(format!( "Executable {executable} not found on worker. PATH: {}", *PATH_ENV )); @@ -967,5 +967,5 @@ pub fn build_http_client(timeout_duration: std::time::Duration) -> error::Result .timeout(timeout_duration) .connect_timeout(std::time::Duration::from_secs(10)) .build() - .map_err(|e| Error::InternalErr(format!("Error building http client: {e:#}"))) + .map_err(|e| Error::internal_err(format!("Error building http client: {e:#}"))) } diff --git a/backend/windmill-worker/src/dedicated_worker.rs b/backend/windmill-worker/src/dedicated_worker.rs index ff6b8cec79ebb..840934260a0cd 100644 --- a/backend/windmill-worker/src/dedicated_worker.rs +++ b/backend/windmill-worker/src/dedicated_worker.rs @@ -470,7 +470,7 @@ pub async fn create_dedicated_worker_map( if let Ok(v) = value { if let Some(v) = v { let value = serde_json::from_str::(v.get()).map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "could not convert json to flow for {flow_path}: {err:?}" )) }); @@ -622,7 +622,7 @@ async fn spawn_dedicated_worker( .bind(&w_id) .fetch_optional(&db) .await - .map_err(|e| Error::InternalErr(format!("expected content and lock: {e:#}"))) + .map_err(|e| Error::internal_err(format!("expected content and lock: {e:#}"))) .map(|x| x.map(|y| (y.0, y.1, y.2, y.3, if y.4 { y.5.map(|z| z.to_string()) } else { None }))) }; if let Ok(q) = q { diff --git a/backend/windmill-worker/src/deno_executor.rs b/backend/windmill-worker/src/deno_executor.rs index 2385771a159af..f27b8de80c28f 100644 --- a/backend/windmill-worker/src/deno_executor.rs +++ b/backend/windmill-worker/src/deno_executor.rs @@ -432,13 +432,13 @@ try {{ let args = read_file(&format!("{job_dir}/args.json")) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while reading args from preprocessing: {e:#}" )) })?; let args: HashMap> = serde_json::from_str(args.get()).map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while deserializing args from preprocessing: {e:#}" )) })?; diff --git a/backend/windmill-worker/src/mysql_executor.rs b/backend/windmill-worker/src/mysql_executor.rs index b98ec365eec9c..d733901f9358b 100644 --- a/backend/windmill-worker/src/mysql_executor.rs +++ b/backend/windmill-worker/src/mysql_executor.rs @@ -133,7 +133,7 @@ pub async fn do_mysql( .await?; let as_raw = serde_json::from_value(val).map_err(|e| { - Error::InternalErr(format!("Error while parsing inline resource: {e:#}")) + Error::internal_err(format!("Error while parsing inline resource: {e:#}")) })?; Some(as_raw) diff --git a/backend/windmill-worker/src/oracledb_executor.rs b/backend/windmill-worker/src/oracledb_executor.rs index f6e64a60a1e6a..e6292181b231b 100644 --- a/backend/windmill-worker/src/oracledb_executor.rs +++ b/backend/windmill-worker/src/oracledb_executor.rs @@ -328,7 +328,7 @@ pub async fn do_oracledb( .await?; let as_raw = serde_json::from_value(val).map_err(|e| { - Error::InternalErr(format!("Error while parsing inline resource: {e:#}")) + Error::internal_err(format!("Error while parsing inline resource: {e:#}")) })?; Some(as_raw) diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 98e1483421465..3a8c1da76027d 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -1218,13 +1218,13 @@ mount {{ let args = read_file(&format!("{job_dir}/args.json")) .await .map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while reading args from preprocessing: {e:#}" )) })?; let args: HashMap> = serde_json::from_str(args.get()).map_err(|e| { - error::Error::InternalErr(format!( + error::Error::internal_err(format!( "error while deserializing args from preprocessing: {e:#}" )) })?; @@ -1438,7 +1438,7 @@ async fn replace_pip_secret( let capture = PIP_SECRET_VARIABLE.captures(req); let variable = capture.unwrap().get(1).unwrap().as_str(); if !variable.contains("/PIP_SECRET_") { - return Err(error::Error::InternalErr(format!( + return Err(error::Error::internal_err(format!( "invalid secret variable in pip requirements, (last part of path ma): {}", req ))); diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index 2c2aefe6b3b0f..23c76173b445d 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -208,7 +208,7 @@ pub async fn create_token_for_owner( let jwt_secret = JWT_SECRET.read().await; if jwt_secret.is_empty() { - return Err(Error::InternalErr("No JWT secret found".to_string())); + return Err(Error::internal_err("No JWT secret found".to_string())); } let job_authed = match sqlx::query_as!( @@ -226,7 +226,7 @@ pub async fn create_token_for_owner( fetch_authed_from_permissioned_as(owner.to_string(), email.to_string(), w_id, db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "Could not get permissions directly for job {job_id}: {e:#}" )) })? @@ -254,7 +254,7 @@ pub async fn create_token_for_owner( &jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_bytes()), ) .map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "Could not encode JWT token for job {job_id}: {:?}", err )) @@ -1330,7 +1330,9 @@ pub async fn run_worker( .bind(same_worker_job.job_id) .fetch_optional(db) .await - .map_err(|_| Error::InternalErr("Impossible to fetch same_worker job".to_string())); + .map_err(|_| { + Error::internal_err("Impossible to fetch same_worker job".to_string()) + }); if r.is_err() && !same_worker_job.recoverable { tracing::error!( worker = %worker_name, hostname = %hostname, @@ -1994,7 +1996,7 @@ async fn handle_queued_job( db, &job.workspace_id, job.parent_job - .ok_or_else(|| Error::InternalErr(format!("expected parent job")))?, + .ok_or_else(|| Error::internal_err(format!("expected parent job")))?, job.id, ) .warn_after_seconds(5) @@ -2280,7 +2282,7 @@ pub async fn get_hub_script_content_and_requirements( ) -> error::Result { let script_path = script_path .clone() - .ok_or_else(|| Error::InternalErr(format!("expected script path for hub script")))?; + .ok_or_else(|| Error::internal_err(format!("expected script path for hub script")))?; let script = get_full_hub_script_by_path(StripPath(script_path.to_string()), &HTTP_CLIENT, db).await?; @@ -2331,7 +2333,7 @@ async fn handle_code_execution_job( ) -> error::Result> { let script_hash = || { job.script_hash - .ok_or_else(|| Error::InternalErr("expected script hash".into())) + .ok_or_else(|| Error::internal_err("expected script hash")) }; let (arc_data, arc_metadata, data, metadata): ( Arc, @@ -2349,7 +2351,8 @@ async fn handle_code_execution_job( _ => None, }; - arc_data = preview.ok_or_else(|| Error::InternalErr("expected preview".to_string()))?; + arc_data = + preview.ok_or_else(|| Error::internal_err("expected preview".to_string()))?; metadata = ScriptMetadata { language: job.language, codebase, envs: None }; (arc_data.as_ref(), &metadata) } @@ -2378,7 +2381,7 @@ async fn handle_code_execution_job( let script_path = job .script_path .as_ref() - .ok_or_else(|| Error::InternalErr("expected script path".to_string()))?; + .ok_or_else(|| Error::internal_err("expected script path".to_string()))?; if script_path.starts_with("hub/") { let ContentReqLangEnvs { content, lockfile, language, envs, codebase } = get_hub_script_content_and_requirements(Some(script_path), Some(db)).await?; @@ -2394,7 +2397,7 @@ async fn handle_code_execution_job( ) .fetch_optional(db) .await? - .ok_or_else(|| Error::InternalErr("expected script hash".to_string()))?; + .ok_or_else(|| Error::internal_err("expected script hash".to_string()))?; (arc_data, arc_metadata) = cache::script::fetch(db, ScriptHash(hash)).await?; (arc_data.as_ref(), arc_metadata.as_ref()) @@ -2421,7 +2424,7 @@ async fn handle_code_execution_job( .await; } else if language == Some(ScriptLang::Mysql) { #[cfg(not(feature = "mysql"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "MySQL requires the mysql feature to be enabled".to_string(), )); @@ -2449,7 +2452,7 @@ async fn handle_code_execution_job( #[allow(unreachable_code)] #[cfg(not(feature = "bigquery"))] { - return Err(Error::InternalErr( + return Err(Error::internal_err( "Bigquery requires the bigquery feature to be enabled".to_string(), )); } @@ -2503,7 +2506,7 @@ async fn handle_code_execution_job( #[allow(unreachable_code)] #[cfg(not(feature = "mssql"))] { - return Err(Error::InternalErr( + return Err(Error::internal_err( "Microsoft SQL server requires the mssql feature to be enabled".to_string(), )); } @@ -2533,7 +2536,7 @@ async fn handle_code_execution_job( #[allow(unreachable_code)] #[cfg(not(feature = "oracledb"))] { - return Err(Error::InternalErr( + return Err(Error::internal_err( "Oracle DB requires the oracledb feature to be enabled".to_string(), )); } @@ -2644,7 +2647,7 @@ mount {{ } Some(ScriptLang::Python3) => { #[cfg(not(feature = "python"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Python requires the python feature to be enabled".to_string(), )); @@ -2761,7 +2764,7 @@ mount {{ } Some(ScriptLang::Php) => { #[cfg(not(feature = "php"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "PHP requires the php feature to be enabled".to_string(), )); @@ -2785,7 +2788,7 @@ mount {{ } Some(ScriptLang::Rust) => { #[cfg(not(feature = "rust"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Rust requires the rust feature to be enabled".to_string(), )); @@ -2809,7 +2812,7 @@ mount {{ } Some(ScriptLang::Ansible) => { #[cfg(not(feature = "python"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Ansible requires the python feature to be enabled".to_string(), )); diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index cf24258b9e08c..37fc023a8d7ac 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -233,7 +233,7 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_one(db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "fetching flow status {flow} while reporting {success} {result:?}: {e:#}" )) }) @@ -242,7 +242,7 @@ pub async fn update_flow_status_after_job_completion_internal( record.job_kind, record.script_hash, serde_json::from_str::(record.flow_status.0.get()).map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "requiring current module to be parsable as FlowStatus: {e:?}" )) })?, @@ -264,12 +264,12 @@ pub async fn update_flow_status_after_job_completion_internal( Step::PreprocessorStep => old_status .preprocessor_module .as_ref() - .ok_or_else(|| Error::InternalErr(format!("preprocessor module not found")))?, + .ok_or_else(|| Error::internal_err(format!("preprocessor module not found")))?, Step::FailureStep => &old_status.failure_module.module_status, Step::Step(i) => old_status .modules .get(i as usize) - .ok_or_else(|| Error::InternalErr(format!("module {i} not found")))?, + .ok_or_else(|| Error::internal_err(format!("module {i} not found")))?, }; // tracing::debug!( @@ -366,7 +366,7 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_one(db) .await .map_err(|e| { - Error::InternalErr(format!("retrieval of args from state: {e:#}")) + Error::internal_err(format!("retrieval of args from state: {e:#}")) })?; compute_bool_from_expr( &expr, @@ -418,7 +418,7 @@ pub async fn update_flow_status_after_job_completion_internal( job_id_for_status, flow ).execute(db).await.map_err(|e| { - Error::InternalErr(format!("error while updating args in preprocessing step: {e:#}")) + Error::internal_err(format!("error while updating args in preprocessing step: {e:#}")) })?; sqlx::query!( @@ -428,7 +428,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while deleting args of preprocessing step: {e:#}" )) })?; @@ -483,11 +483,11 @@ pub async fn update_flow_status_after_job_completion_internal( } .fetch_one(&mut *tx) .await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while fetching iterator index: {e:#}" )) })? - .ok_or_else(|| Error::InternalErr(format!("requiring an index in InProgress")))?; + .ok_or_else(|| Error::internal_err(format!("requiring an index in InProgress")))?; tracing::info!( "parallel iteration {job_id_for_status} of flow {flow} update nindex: {nindex} len: {len}", nindex = nindex, @@ -528,14 +528,14 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_one(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while fetching branchall index: {e:#}" )) })? - .ok_or_else(|| Error::InternalErr(format!("requiring an index in InProgress")))?; + .ok_or_else(|| Error::internal_err(format!("requiring an index in InProgress")))?; (nindex, *len as i32) } - _ => Err(Error::InternalErr(format!( + _ => Err(Error::internal_err(format!( "unexpected status for parallel module" )))?, }; @@ -558,7 +558,7 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_all(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while fetching sucess from completed_jobs: {e:#}" )) })? @@ -591,7 +591,7 @@ pub async fn update_flow_status_after_job_completion_internal( "DELETE FROM parallel_monitor_lock WHERE parent_flow_id = $1 RETURNING last_ping", flow, ).fetch_optional(db).await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while deleting parallel_monitor_lock: {e:#}" )) })?; @@ -618,7 +618,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(db) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error resuming job at suspend {nindex} and parent {flow}: {e:#}" )) })?; @@ -629,7 +629,7 @@ pub async fn update_flow_status_after_job_completion_internal( flow, job_id_for_status ).fetch_optional(db).await.map_err(|e| { - Error::InternalErr(format!("error while removing parallel_monitor_lock: {e:#}")) + Error::internal_err(format!("error while removing parallel_monitor_lock: {e:#}")) })?; if r.is_some() { tracing::info!( @@ -720,7 +720,9 @@ pub async fn update_flow_status_after_job_completion_internal( ) .fetch_one(db) .await - .map_err(|e| Error::InternalErr(format!("error during skip check: {e:#}")))? + .map_err(|e| { + Error::internal_err(format!("error during skip check: {e:#}")) + })? .unwrap_or(false) } else { false @@ -777,7 +779,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!("error while setting flow index for {flow}: {e:#}")) + Error::internal_err(format!("error while setting flow index for {flow}: {e:#}")) })?; old_status.step + 1 } else { @@ -797,7 +799,7 @@ pub async fn update_flow_status_after_job_completion_internal( ) .fetch_one(&mut *tx) .await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while fetching failure module: {e:#}" )) })?; @@ -815,7 +817,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while setting flow status in failure step: {e:#}" )) })?; @@ -830,7 +832,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while setting flow status in preprocessing step: {e:#}" )) })?; @@ -846,7 +848,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(&mut *tx) .await .map_err(|e| { - Error::InternalErr(format!("error while setting new flow status: {e:#}")) + Error::internal_err(format!("error while setting new flow status: {e:#}")) })?; if let Some(job_result) = new_status.job_result() { @@ -860,7 +862,7 @@ pub async fn update_flow_status_after_job_completion_internal( ) .execute(&mut *tx) .await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while setting leaf jobs: {e:#}" )) })?; @@ -893,7 +895,7 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_one(db) .await .map_err(|e| { - Error::InternalErr(format!("retrieval of args from state: {e:#}")) + Error::internal_err(format!("retrieval of args from state: {e:#}")) })?; let should_stop = compute_bool_from_expr( @@ -947,7 +949,7 @@ pub async fn update_flow_status_after_job_completion_internal( .fetch_optional(&mut *tx) .await .map_err(Into::::into)? - .ok_or_else(|| Error::InternalErr(format!("requiring flow to be in the queue")))?; + .ok_or_else(|| Error::internal_err(format!("requiring flow to be in the queue")))?; tx.commit().await?; let job_root = flow_job @@ -1044,7 +1046,7 @@ pub async fn update_flow_status_after_job_completion_internal( .execute(db) .await .map_err(|e| { - Error::InternalErr(format!("error while cleaning up completed_job: {e:#}")) + Error::internal_err(format!("error while cleaning up completed_job: {e:#}")) })?; } } @@ -1200,7 +1202,7 @@ async fn set_success_in_flow_job_success<'c>( ) .execute(&mut **tx) .await.map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error while setting flow_jobs_success: {e:#}" )) })?; @@ -1232,7 +1234,7 @@ async fn retrieve_flow_jobs_results( .map(|j| { results .get(j) - .ok_or_else(|| Error::InternalErr(format!("missing job result for {}", j))) + .ok_or_else(|| Error::internal_err(format!("missing job result for {}", j))) }) .collect::, _>>()?; @@ -1252,19 +1254,19 @@ async fn compute_skip_branchall_failure<'c>( .fetch_one(db) .await .map_err(|e| { - Error::InternalErr(format!("error during retrieval of branchall index: {e:#}")) + Error::internal_err(format!("error during retrieval of branchall index: {e:#}")) })? .map(|p| { BRANCHALL_INDEX_RE .captures(&p) .map(|x| x.get(1).unwrap().as_str().parse::().ok()) .flatten() - .ok_or(Error::InternalErr(format!( + .ok_or(Error::internal_err(format!( "could not parse branchall index from path: {p}" ))) }) .ok_or_else(|| { - Error::InternalErr(format!("no branchall script path found for job {job}")) + Error::internal_err(format!("no branchall script path found for job {job}")) })?? } else { branch as i32 @@ -1288,12 +1290,12 @@ async fn compute_skip_branchall_failure<'c>( // ) // .fetch_one(db) // .await -// .map_err(|e| Error::InternalErr(format!("error during retrieval of cleanup module: {e:#}")))?; +// .map_err(|e| Error::internal_err(format!("error during retrieval of cleanup module: {e:#}")))?; // raw_value // .clone() // .and_then(|rv| serde_json::from_value::(rv).ok()) -// .ok_or(Error::InternalErr(format!( +// .ok_or(Error::internal_err(format!( // "Unable to parse flow cleanup module {:?}", // raw_value // ))) @@ -1427,12 +1429,12 @@ pub async fn get_step_of_flow_status(db: &DB, id: Uuid) -> error::Result { ) .fetch_one(db) .await - .map_err(|e| Error::InternalErr(format!("fetching step flow status: {e:#}")))?; + .map_err(|e| Error::internal_err(format!("fetching step flow status: {e:#}")))?; if let Some(step) = r.step { Ok(Step::from_i32_and_len(step, r.len.unwrap_or(0) as usize)) } else { - Err(Error::InternalErr("step is null".to_string())) + Err(Error::internal_err("step is null".to_string())) } } @@ -1666,7 +1668,7 @@ async fn push_next_flow_job( }) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error sending update flow message to job completed channel: {e:#}" )) })?; @@ -1711,7 +1713,7 @@ async fn push_next_flow_job( }) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error sending update flow message to job completed channel: {e:#}" )) })?; @@ -1748,7 +1750,7 @@ async fn push_next_flow_job( }) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error sending update flow message to job completed channel: {e:#}" )) })?; @@ -1897,7 +1899,10 @@ async fn push_next_flow_job( && suspend.continue_on_disapprove_timeout.unwrap_or(false); let audit_author = AuditAuthor { - username: flow_job.permissioned_as.trim_start_matches("u/").to_string(), + username: flow_job + .permissioned_as + .trim_start_matches("u/") + .to_string(), email: flow_job.email.clone(), username_override: None, }; @@ -2038,7 +2043,7 @@ async fn push_next_flow_job( }) .await .map_err(|e| { - Error::InternalErr(format!( + Error::internal_err(format!( "error sending update flow message to job completed channel: {e:#}" )) })?; @@ -2287,7 +2292,7 @@ async fn push_next_flow_job( .unwrap(), ) .map(Marc::new) - .map_err(|e| error::Error::InternalErr(format!("identity: {e:#}"))) + .map_err(|e| error::Error::internal_err(format!("identity: {e:#}"))) } Ok( FlowModuleValue::Script { input_transforms, .. } @@ -2313,7 +2318,7 @@ async fn push_next_flow_job( } Ok(_) => Ok(arc_flow_job_args.clone()), Err(e) => { - return Err(error::Error::InternalErr(format!( + return Err(error::Error::internal_err(format!( "module was not convertible to acceptable value {e:?}" ))) } @@ -2637,7 +2642,7 @@ async fn push_next_flow_job( if payload_tag.delete_after_use { let uuid_singleton_json = serde_json::to_value(&[uuid]).map_err(|e| { - error::Error::InternalErr(format!("Unable to serialize uuid: {e:#}")) + error::Error::internal_err(format!("Unable to serialize uuid: {e:#}")) })?; sqlx::query( diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index d4df729910ec1..3437d692de5c3 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -265,13 +265,13 @@ pub async fn handle_dependency_job( Some(hash) => &cache::script::fetch(db, hash).await?.0, _ => match preview_data { Some(RawData::Script(data)) => data, - _ => return Err(Error::InternalErr("expected script hash".into())), + _ => return Err(Error::internal_err("expected script hash")), }, }; let content = capture_dependency_job( &job.id, job.language.as_ref().map(|v| Ok(v)).unwrap_or_else(|| { - Err(Error::InternalErr( + Err(Error::internal_err( "Job Language required for dependency jobs".to_owned(), )) })?, @@ -551,7 +551,7 @@ pub async fn handle_flow_dependency_job( occupancy_metrics: &mut OccupancyMetrics, ) -> error::Result> { let job_path = job.script_path.clone().ok_or_else(|| { - error::Error::InternalErr( + error::Error::internal_err( "Cannot resolve flow dependencies for flow without path".to_string(), ) })?; @@ -574,7 +574,7 @@ pub async fn handle_flow_dependency_job( job.script_hash .clone() .ok_or_else(|| { - Error::InternalErr( + Error::internal_err( "Flow Dependency requires script hash (flow version)".to_owned(), ) })? @@ -602,7 +602,7 @@ pub async fn handle_flow_dependency_job( Some(ScriptHash(id)) => cache::flow::fetch_version(db, id).await?, _ => match preview_data { Some(RawData::Flow(data)) => data.clone(), - _ => return Err(Error::InternalErr("expected script hash".into())), + _ => return Err(Error::internal_err("expected script hash")), }, } .value() @@ -649,7 +649,7 @@ pub async fn handle_flow_dependency_job( if !skip_flow_update { let version = version.ok_or_else(|| { - Error::InternalErr("Flow Dependency requires script hash (flow version)".to_owned()) + Error::internal_err("Flow Dependency requires script hash (flow version)".to_owned()) })?; sqlx::query!( @@ -1163,7 +1163,7 @@ async fn reduce_flow<'c>( for module in &mut *modules { let mut val = serde_json::from_str::(module.value.get()).map_err(|err| { - Error::InternalErr(format!( + Error::internal_err(format!( "reduce_flow: Failed to parse flow module value: {}", err )) @@ -1274,7 +1274,7 @@ async fn reduce_app(db: &sqlx::Pool, value: &mut Value, app: i64 // replace `content` with an empty string: let Some(Value::String(code)) = script.get_mut("content").map(std::mem::take) else { - return Err(error::Error::InternalErr( + return Err(error::Error::internal_err( "Missing `content` in inlineScript".to_string(), )); }; @@ -1477,7 +1477,7 @@ pub async fn handle_app_dependency_job( occupancy_metrics: &mut OccupancyMetrics, ) -> error::Result<()> { let job_path = job.script_path.clone().ok_or_else(|| { - error::Error::InternalErr( + error::Error::internal_err( "Cannot resolve app dependencies for app without path".to_string(), ) })?; @@ -1485,7 +1485,7 @@ pub async fn handle_app_dependency_job( let id = job .script_hash .clone() - .ok_or_else(|| Error::InternalErr("App Dependency requires script hash".to_owned()))? + .ok_or_else(|| Error::internal_err("App Dependency requires script hash".to_owned()))? .0; let record = sqlx::query!("SELECT app_id, value FROM app_version WHERE id = $1", id) .fetch_optional(db) @@ -1572,7 +1572,7 @@ pub async fn handle_app_dependency_job( // match tx { // PushIsolationLevel::Transaction(tx) => tx.commit().await?, // _ => { - // return Err(Error::InternalErr( + // return Err(Error::internal_err( // "Expected a transaction here".to_string(), // )); // } @@ -1682,7 +1682,7 @@ async fn capture_dependency_job( match job_language { ScriptLang::Python3 => { #[cfg(not(feature = "python"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Python requires the python feature to be enabled".to_string(), )); #[cfg(feature = "python")] @@ -1749,7 +1749,7 @@ async fn capture_dependency_job( } ScriptLang::Ansible => { #[cfg(not(feature = "python"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Ansible requires the python feature to be enabled".to_string(), )); @@ -1887,7 +1887,7 @@ async fn capture_dependency_job( } ScriptLang::Php => { #[cfg(not(feature = "php"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "PHP requires the php feature to be enabled".to_string(), )); @@ -1929,7 +1929,7 @@ async fn capture_dependency_job( } #[cfg(not(feature = "rust"))] - return Err(Error::InternalErr( + return Err(Error::internal_err( "Rust requires the rust feature to be enabled".to_string(), ));