From 4b444364ff310efd66644a516a5f4462bdf6871d Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 19 Dec 2024 19:00:57 +1100 Subject: [PATCH 1/2] c --- crates/polars-io/src/cloud/glob.rs | 22 +-- .../polars-io/src/cloud/object_store_setup.rs | 42 ++++-- crates/polars-io/src/cloud/options.rs | 126 ++++++++++++++++-- .../src/plans/conversion/dsl_to_ir.rs | 18 ++- 4 files changed, 166 insertions(+), 42 deletions(-) diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index c55d7767f39f..9ad4595faa53 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -1,8 +1,9 @@ use futures::TryStreamExt; use object_store::path::Path; use polars_core::error::to_compute_err; -use polars_core::prelude::{polars_ensure, polars_err}; -use polars_error::PolarsResult; +use polars_core::prelude::polars_ensure; +use polars_error::{polars_bail, PolarsResult}; +use polars_utils::format_pl_smallstr; use polars_utils::pl_str::PlSmallStr; use regex::Regex; use url::Url; @@ -98,13 +99,16 @@ impl CloudLocation { } let key = parsed.path(); - let bucket = parsed - .host() - .ok_or_else( - || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed), - )? - .to_string() - .into(); + + let bucket = format_pl_smallstr!( + "{}", + &parsed[url::Position::BeforeUsername..url::Position::AfterPort] + ); + + if bucket.is_empty() { + polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed); + } + (bucket, key) }; diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 22e666a8198b..85e61d721fd0 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -7,6 +7,7 @@ use polars_core::config; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; use polars_utils::aliases::PlHashMap; use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, pl_serialize}; use tokio::sync::RwLock; use url::Url; @@ -17,7 +18,7 @@ use crate::cloud::CloudConfig; /// get rate limited when querying the DNS (can take up to 5s). /// Other reasons are connection pools that must be shared between as much as possible. #[allow(clippy::type_complexity)] -static OBJECT_STORE_CACHE: Lazy>> = +static OBJECT_STORE_CACHE: Lazy, PolarsObjectStore>>> = Lazy::new(Default::default); #[allow(dead_code)] @@ -29,10 +30,10 @@ fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult) -> String { +fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec { #[derive(Clone, Debug, PartialEq, Hash, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] - struct S { + struct C { max_retries: usize, #[cfg(feature = "file_cache")] file_cache_ttl: u64, @@ -41,8 +42,15 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: usize, } + #[derive(Clone, Debug, PartialEq, Hash, Eq)] + #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + struct S { + url_base: PlSmallStr, + cloud_options: Option, + } + // We include credentials as they can expire, so users will send new credentials for the same url. - let creds = serde_json::to_string(&options.map( + let cloud_options = options.map( |CloudOptions { // Destructure to ensure this breaks if anything changes. max_retries, @@ -52,7 +60,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { #[cfg(feature = "cloud")] credential_provider, }| { - S { + C { max_retries: *max_retries, #[cfg(feature = "file_cache")] file_cache_ttl: *file_cache_ttl, @@ -61,15 +69,21 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()), } }, - )) - .unwrap(); - - format!( - "{}://{}<\\creds\\>{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - creds - ) + ); + + let cache_key = S { + url_base: format_pl_smallstr!( + "{}", + &url[url::Position::BeforeScheme..url::Position::AfterPort] + ), + cloud_options, + }; + + if config::verbose() { + eprintln!("object store cache key: {} {:?}", url, &cache_key); + } + + pl_serialize::serialize_to_bytes(&cache_key).unwrap() } /// Construct an object_store `Path` from a string without any encoding/decoding. diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index de5f8903eabd..4159671204f8 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -25,8 +25,6 @@ use polars_error::*; #[cfg(feature = "aws")] use polars_utils::cache::FastFixedCache; #[cfg(feature = "aws")] -use polars_utils::pl_str::PlSmallStr; -#[cfg(feature = "aws")] use regex::Regex; #[cfg(feature = "http")] use reqwest::header::HeaderMap; @@ -43,8 +41,11 @@ use crate::file_cache::get_env_file_cache_ttl; use crate::pl_async::with_concurrency_budget; #[cfg(feature = "aws")] -static BUCKET_REGION: Lazy>> = - Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32))); +static BUCKET_REGION: Lazy< + std::sync::Mutex< + FastFixedCache, + >, +> = Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32))); /// The type of the config keys must satisfy the following requirements: /// 1. must be easily collected into a HashMap, the type required by the object_crate API. @@ -406,16 +407,20 @@ impl CloudOptions { pub fn build_azure(&self, url: &str) -> PolarsResult { use super::credential_provider::IntoCredentialProvider; - let mut builder = if self.credential_provider.is_none() { - MicrosoftAzureBuilder::from_env() - } else { - MicrosoftAzureBuilder::new() - }; + let mut storage_account: Option = None; + + // The credential provider `self.credentials` is prioritized if it is set. We also need + // `from_env()` as it may source environment configured storage account name. + let mut builder = MicrosoftAzureBuilder::from_env(); + if let Some(options) = &self.config { let CloudConfig::Azure(options) = options else { panic!("impl error: cloud type mismatch") }; for (key, value) in options.iter() { + if key == &AzureConfigKey::AccountName { + storage_account = Some(value.into()); + } builder = builder.with_config(*key, value); } } @@ -425,8 +430,18 @@ impl CloudOptions { .with_url(url) .with_retry(get_retry_config(self.max_retries)); + // Prefer the one embedded in the path + storage_account = extract_adls_uri_storage_account(url) + .map(|x| x.into()) + .or(storage_account); + let builder = if let Some(v) = self.credential_provider.clone() { builder.with_credentials(v.into_azure_provider()) + } else if let Some(v) = storage_account + .as_deref() + .and_then(get_azure_storage_account_key) + { + builder.with_access_key(v) } else { builder }; @@ -610,6 +625,99 @@ impl CloudOptions { } } +/// ```text +/// "abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/" +/// ^^^^^^^^^^^^^^^^^ +/// ``` +#[cfg(feature = "azure")] +fn extract_adls_uri_storage_account(path: &str) -> Option<&str> { + Some( + path.split_once("://")? + .1 + .split_once('/')? + .0 + .split_once('@')? + .1 + .split_once(".dfs.core.windows.net")? + .0, + ) +} + +/// Attempt to retrieve the storage account key for this account using the Azure CLI. +#[cfg(feature = "azure")] +fn get_azure_storage_account_key(account_name: &str) -> Option { + if polars_core::config::verbose() { + eprintln!( + "get_azure_storage_account_key: storage_account_name: {}", + account_name + ); + } + + let mut cmd = if cfg!(target_family = "windows") { + // https://github.com/apache/arrow-rs/blob/565c24b8071269b02c3937e34c51eacf0f4cbad6/object_store/src/azure/credential.rs#L877-L894 + let mut v = std::process::Command::new("cmd"); + v.args([ + "/C", + "az", + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + } else { + let mut v = std::process::Command::new("az"); + v.args([ + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + }; + + let json_resp = cmd + .output() + .ok() + .filter(|x| x.status.success()) + .map(|x| String::from_utf8(x.stdout))? + .ok()?; + + // [ + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key1", + // "permissions": "FULL", + // "value": "..." + // }, + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key2", + // "permissions": "FULL", + // "value": "..." + // } + // ] + + #[derive(Debug, serde::Deserialize)] + struct S { + value: String, + } + + let resp: Vec = serde_json::from_str(&json_resp).ok()?; + + let access_key = resp.into_iter().next()?.value; + + Some(access_key) +} + #[cfg(feature = "cloud")] #[cfg(test)] mod tests { diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 34f03e6debdd..0ad4bb6bb0ee 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -163,21 +163,19 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult let sources = match &scan_type { #[cfg(feature = "parquet")] - FileScan::Parquet { - ref cloud_options, .. - } => sources + FileScan::Parquet { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "ipc")] - FileScan::Ipc { - ref cloud_options, .. - } => sources + FileScan::Ipc { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "csv")] - FileScan::Csv { - ref cloud_options, .. - } => sources.expand_paths(&file_options, cloud_options.as_ref())?, + FileScan::Csv { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, #[cfg(feature = "json")] - FileScan::NDJson { .. } => sources.expand_paths(&file_options, None)?, + FileScan::NDJson { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, FileScan::Anonymous { .. } => sources, }; From 9b1bf2639e3949f834a0fb4bcfe00c12ac2babed Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 19 Dec 2024 19:05:16 +1100 Subject: [PATCH 2/2] c --- crates/polars-io/src/cloud/object_store_setup.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 85e61d721fd0..f7b47412f167 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -32,7 +32,7 @@ fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult) -> Vec { #[derive(Clone, Debug, PartialEq, Hash, Eq)] - #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + #[cfg_attr(feature = "serde", derive(serde::Serialize))] struct C { max_retries: usize, #[cfg(feature = "file_cache")] @@ -43,7 +43,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec { } #[derive(Clone, Debug, PartialEq, Hash, Eq)] - #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + #[cfg_attr(feature = "serde", derive(serde::Serialize))] struct S { url_base: PlSmallStr, cloud_options: Option,