From 0d412b962167f46d86106dfae5b42719190cc809 Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 1 Sep 2024 19:29:56 +0800 Subject: [PATCH 1/6] support projection pushdown for datafusion iceberg --- Cargo.toml | 1 + crates/examples/Cargo.toml | 6 + .../datafusion/src/physical_plan/scan.rs | 41 ++++++- crates/integrations/datafusion/src/table.rs | 3 +- .../tests/integration_datafusion_test.rs | 109 +++++++++++++++--- 5 files changed, 142 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d04f6799..a38ce8bca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ iceberg = { version = "0.3.0", path = "./crates/iceberg" } iceberg-catalog-rest = { version = "0.3.0", path = "./crates/catalog/rest" } iceberg-catalog-hms = { version = "0.3.0", path = "./crates/catalog/hms" } iceberg-catalog-memory = { version = "0.3.0", path = "./crates/catalog/memory" } +iceberg-datafusion = { version = "0.3.0", path = "./crates/integrations/datafusion" } itertools = "0.13" log = "0.4" mockito = "1" diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 2fb3060c1..dfed40696 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,6 +27,8 @@ rust-version = { workspace = true } [dependencies] iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } +datafusion = { version = "41.0.0" } tokio = { version = "1", features = ["full"] } [[example]] @@ -36,3 +38,7 @@ path = "src/rest_catalog_namespace.rs" [[example]] name = "rest-catalog-table" path = "src/rest_catalog_table.rs" + +[[example]] +name = "datafusion-read-data" +path = "src/datafusion_read_data.rs" diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index c50b32efb..fe155c396 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -18,6 +18,7 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; +use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; @@ -44,17 +45,25 @@ pub(crate) struct IcebergTableScan { /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: PlanProperties, + /// Projection column names + projection: Vec, } impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + pub(crate) fn new( + table: Table, + schema: ArrowSchemaRef, + projection: Option<&Vec>, + ) -> Self { let plan_properties = Self::compute_properties(schema.clone()); + let projection = get_column_names(schema.clone(), projection); Self { table, schema, plan_properties, + projection, } } @@ -100,7 +109,7 @@ impl ExecutionPlan for IcebergTableScan { _partition: usize, _context: Arc, ) -> DFResult { - let fut = get_batch_stream(self.table.clone()); + let fut = get_batch_stream(self.table.clone(), self.projection.clone()); let stream = futures::stream::once(fut).try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new( @@ -116,7 +125,11 @@ impl DisplayAs for IcebergTableScan { _t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - write!(f, "IcebergTableScan") + write!( + f, + "IcebergTableScan projection:[{}]", + self.projection.join(" ") + ) } } @@ -127,8 +140,13 @@ impl DisplayAs for IcebergTableScan { /// and then converts it into a stream of Arrow [`RecordBatch`]es. async fn get_batch_stream( table: Table, + column_names: Vec, ) -> DFResult> + Send>>> { - let table_scan = table.scan().build().map_err(to_datafusion_error)?; + let table_scan = table + .scan() + .select(column_names) + .build() + .map_err(to_datafusion_error)?; let stream = table_scan .to_arrow() @@ -138,3 +156,18 @@ async fn get_batch_stream( Ok(Box::pin(stream)) } + +fn get_column_names(schema: ArrowSchemaRef, projection: Option<&Vec>) -> Vec { + if let Some(projection) = projection { + projection + .iter() + .map(|p| schema.field(*p).name().clone()) + .collect::>() + } else { + schema + .fields() + .iter() + .map(|f| f.name().clone()) + .collect::>() + } +} diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 7ff7b2211..8d70d9488 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -75,13 +75,14 @@ impl TableProvider for IcebergTableProvider { async fn scan( &self, _state: &dyn Session, - _projection: Option<&Vec>, + projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> DFResult> { Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.schema.clone(), + projection, ))) } } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 9e62930fd..3bd8f2ded 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -19,11 +19,13 @@ use std::collections::HashMap; use std::sync::Arc; +use std::vec; +use datafusion::arrow::array::{Array, StringArray}; use datafusion::arrow::datatypes::DataType; use datafusion::execution::context::SessionContext; use iceberg::io::FileIOBuilder; -use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use iceberg_catalog_memory::MemoryCatalog; use iceberg_datafusion::IcebergCatalogProvider; @@ -39,6 +41,13 @@ fn get_iceberg_catalog() -> MemoryCatalog { MemoryCatalog::new(file_io, Some(temp_path())) } +fn get_struct_type() -> StructType { + StructType::new(vec![ + NestedField::required(13, "s_foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(14, "s_foo2", Type::Primitive(PrimitiveType::String)).into(), + ]) +} + async fn set_test_namespace(catalog: &MemoryCatalog, namespace: &NamespaceIdent) -> Result<()> { let properties = HashMap::new(); @@ -47,14 +56,21 @@ async fn set_test_namespace(catalog: &MemoryCatalog, namespace: &NamespaceIdent) Ok(()) } -fn set_table_creation(location: impl ToString, name: impl ToString) -> Result { - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; +fn get_table_creation( + location: impl ToString, + name: impl ToString, + schema: Option, +) -> Result { + let schema = match schema { + None => Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "foo2", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + Some(schema) => schema, + }; let creation = TableCreation::builder() .location(location.to_string()) @@ -72,7 +88,7 @@ async fn test_provider_get_table_schema() -> Result<()> { let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; - let creation = set_table_creation(temp_path(), "my_table")?; + let creation = get_table_creation(temp_path(), "my_table", None)?; iceberg_catalog.create_table(&namespace, creation).await?; let client = Arc::new(iceberg_catalog); @@ -87,7 +103,7 @@ async fn test_provider_get_table_schema() -> Result<()> { let table = schema.table("my_table").await.unwrap().unwrap(); let table_schema = table.schema(); - let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)]; + let expected = [("foo1", &DataType::Int32), ("foo2", &DataType::Utf8)]; for (field, exp) in table_schema.fields().iter().zip(expected.iter()) { assert_eq!(field.name(), exp.0); @@ -104,7 +120,7 @@ async fn test_provider_list_table_names() -> Result<()> { let namespace = NamespaceIdent::new("test_provider_list_table_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; - let creation = set_table_creation(temp_path(), "my_table")?; + let creation = get_table_creation(temp_path(), "my_table", None)?; iceberg_catalog.create_table(&namespace, creation).await?; let client = Arc::new(iceberg_catalog); @@ -130,7 +146,6 @@ async fn test_provider_list_schema_names() -> Result<()> { let namespace = NamespaceIdent::new("test_provider_list_schema_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; - set_table_creation("test_provider_list_schema_names", "my_table")?; let client = Arc::new(iceberg_catalog); let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); @@ -147,3 +162,71 @@ async fn test_provider_list_schema_names() -> Result<()> { .all(|item| result.contains(&item.to_string()))); Ok(()) } + +#[tokio::test] +async fn test_table_projection() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("ns".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "foo2", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(0, "foo3", Type::Struct(get_struct_type())).into(), + ]) + .build()?; + let creation = get_table_creation(temp_path(), "t1", Some(schema))?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + let table_df = ctx.table("catalog.ns.t1").await.unwrap(); + + let records = table_df + .clone() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(1, records.len()); + let record = &records[0]; + // the first column is plan_type, the second column plan string. + let s = record + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(2, s.len()); + // the first row is logical_plan, the second row is physical_plan + assert_eq!( + "IcebergTableScan projection:[foo1 foo2 foo3]", + s.value(1).trim() + ); + + // datafusion doesn't support query foo3.s_foo1, use foo3 instead + let records = table_df + .select_columns(&["foo1", "foo3"]) + .unwrap() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(1, records.len()); + let record = &records[0]; + let s = record + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(2, s.len()); + assert_eq!("IcebergTableScan projection:[foo1 foo3]", s.value(1).trim()); + + Ok(()) +} From e3a58bec6293a1c6a21794bf08022719b969a56f Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 1 Sep 2024 19:32:44 +0800 Subject: [PATCH 2/6] support projection pushdown for datafusion iceberg --- crates/examples/src/datafusion_read_data.rs | 37 +++++++++++++++++++++ crates/examples/src/utils.rs | 31 +++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 crates/examples/src/datafusion_read_data.rs create mode 100644 crates/examples/src/utils.rs diff --git a/crates/examples/src/datafusion_read_data.rs b/crates/examples/src/datafusion_read_data.rs new file mode 100644 index 000000000..56fac6251 --- /dev/null +++ b/crates/examples/src/datafusion_read_data.rs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::prelude::SessionContext; +use iceberg_datafusion::IcebergCatalogProvider; + +mod utils; + +#[tokio::main] +async fn main() { + let iceberg_catalog = utils::get_rest_catalog(); + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await.unwrap()); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + let df = ctx.sql("select * from catalog.ns.table1").await.unwrap(); + let data = df.collect().await.unwrap(); + println!("{:?}", data); +} diff --git a/crates/examples/src/utils.rs b/crates/examples/src/utils.rs new file mode 100644 index 000000000..2784314f0 --- /dev/null +++ b/crates/examples/src/utils.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::env; + +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; + +fn get_catalog_uri_from_env() -> String { + env::var("CATALOG_URI").unwrap_or("http://localhost:8080".to_string()) +} + +pub fn get_rest_catalog() -> RestCatalog { + let config = RestCatalogConfig::builder() + .uri(get_catalog_uri_from_env()) + .build(); + RestCatalog::new(config) +} From 3e7426db1e32f55665456d02a6515badcc4a5114 Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 1 Sep 2024 22:28:11 +0800 Subject: [PATCH 3/6] fix ci --- crates/examples/Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index dfed40696..6db8bb980 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -25,12 +25,16 @@ license = { workspace = true } rust-version = { workspace = true } [dependencies] +datafusion = { version = "41.0.0" } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } iceberg-datafusion = { workspace = true } -datafusion = { version = "41.0.0" } tokio = { version = "1", features = ["full"] } +[[example]] +name = "datafusion-read-data" +path = "src/datafusion_read_data.rs" + [[example]] name = "rest-catalog-namespace" path = "src/rest_catalog_namespace.rs" @@ -38,7 +42,3 @@ path = "src/rest_catalog_namespace.rs" [[example]] name = "rest-catalog-table" path = "src/rest_catalog_table.rs" - -[[example]] -name = "datafusion-read-data" -path = "src/datafusion_read_data.rs" From a2cbd1533d29760bd623e457716528b7e8a34304 Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 2 Sep 2024 09:07:11 +0800 Subject: [PATCH 4/6] fix field id --- .../datafusion/tests/integration_datafusion_test.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 3bd8f2ded..deb957531 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -43,8 +43,8 @@ fn get_iceberg_catalog() -> MemoryCatalog { fn get_struct_type() -> StructType { StructType::new(vec![ - NestedField::required(13, "s_foo1", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(14, "s_foo2", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "s_foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(5, "s_foo2", Type::Primitive(PrimitiveType::String)).into(), ]) } @@ -174,7 +174,7 @@ async fn test_table_projection() -> Result<()> { .with_fields(vec![ NestedField::required(1, "foo1", Type::Primitive(PrimitiveType::Int)).into(), NestedField::required(2, "foo2", Type::Primitive(PrimitiveType::String)).into(), - NestedField::optional(0, "foo3", Type::Struct(get_struct_type())).into(), + NestedField::optional(3, "foo3", Type::Struct(get_struct_type())).into(), ]) .build()?; let creation = get_table_creation(temp_path(), "t1", Some(schema))?; From 59fff2522fd98432f3b168274b3523e5a2686774 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 5 Sep 2024 21:16:38 +0800 Subject: [PATCH 5/6] remove depencences --- .../datafusion/src/physical_plan/scan.rs | 38 +++++++++---------- .../tests/integration_datafusion_test.rs | 4 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index fe155c396..576acea6b 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -45,8 +45,8 @@ pub(crate) struct IcebergTableScan { /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: PlanProperties, - /// Projection column names - projection: Vec, + /// Projection column names, None means all columns + projection: Option>, } impl IcebergTableScan { @@ -128,7 +128,9 @@ impl DisplayAs for IcebergTableScan { write!( f, "IcebergTableScan projection:[{}]", - self.projection.join(" ") + self.projection + .clone() + .map_or(String::new(), |v| v.join(",")) ) } } @@ -140,13 +142,13 @@ impl DisplayAs for IcebergTableScan { /// and then converts it into a stream of Arrow [`RecordBatch`]es. async fn get_batch_stream( table: Table, - column_names: Vec, + column_names: Option>, ) -> DFResult> + Send>>> { - let table_scan = table - .scan() - .select(column_names) - .build() - .map_err(to_datafusion_error)?; + let scan_builder = match column_names { + Some(column_names) => table.scan().select(column_names), + None => table.scan().select_all(), + }; + let table_scan = scan_builder.build().map_err(to_datafusion_error)?; let stream = table_scan .to_arrow() @@ -157,17 +159,13 @@ async fn get_batch_stream( Ok(Box::pin(stream)) } -fn get_column_names(schema: ArrowSchemaRef, projection: Option<&Vec>) -> Vec { - if let Some(projection) = projection { - projection - .iter() +fn get_column_names( + schema: ArrowSchemaRef, + projection: Option<&Vec>, +) -> Option> { + projection.map(|v| { + v.iter() .map(|p| schema.field(*p).name().clone()) .collect::>() - } else { - schema - .fields() - .iter() - .map(|f| f.name().clone()) - .collect::>() - } + }) } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index deb957531..d6e22d044 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -205,7 +205,7 @@ async fn test_table_projection() -> Result<()> { assert_eq!(2, s.len()); // the first row is logical_plan, the second row is physical_plan assert_eq!( - "IcebergTableScan projection:[foo1 foo2 foo3]", + "IcebergTableScan projection:[foo1,foo2,foo3]", s.value(1).trim() ); @@ -226,7 +226,7 @@ async fn test_table_projection() -> Result<()> { .downcast_ref::() .unwrap(); assert_eq!(2, s.len()); - assert_eq!("IcebergTableScan projection:[foo1 foo3]", s.value(1).trim()); + assert_eq!("IcebergTableScan projection:[foo1,foo3]", s.value(1).trim()); Ok(()) } From 78fc2fe63a1786bf7813736a05d1e142a5e8f1f6 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 5 Sep 2024 21:18:32 +0800 Subject: [PATCH 6/6] remove depencences --- crates/examples/Cargo.toml | 6 ---- crates/examples/src/datafusion_read_data.rs | 37 --------------------- crates/examples/src/utils.rs | 31 ----------------- 3 files changed, 74 deletions(-) delete mode 100644 crates/examples/src/datafusion_read_data.rs delete mode 100644 crates/examples/src/utils.rs diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 6db8bb980..2fb3060c1 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -25,16 +25,10 @@ license = { workspace = true } rust-version = { workspace = true } [dependencies] -datafusion = { version = "41.0.0" } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } -iceberg-datafusion = { workspace = true } tokio = { version = "1", features = ["full"] } -[[example]] -name = "datafusion-read-data" -path = "src/datafusion_read_data.rs" - [[example]] name = "rest-catalog-namespace" path = "src/rest_catalog_namespace.rs" diff --git a/crates/examples/src/datafusion_read_data.rs b/crates/examples/src/datafusion_read_data.rs deleted file mode 100644 index 56fac6251..000000000 --- a/crates/examples/src/datafusion_read_data.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use datafusion::prelude::SessionContext; -use iceberg_datafusion::IcebergCatalogProvider; - -mod utils; - -#[tokio::main] -async fn main() { - let iceberg_catalog = utils::get_rest_catalog(); - - let client = Arc::new(iceberg_catalog); - let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await.unwrap()); - - let ctx = SessionContext::new(); - ctx.register_catalog("catalog", catalog); - let df = ctx.sql("select * from catalog.ns.table1").await.unwrap(); - let data = df.collect().await.unwrap(); - println!("{:?}", data); -} diff --git a/crates/examples/src/utils.rs b/crates/examples/src/utils.rs deleted file mode 100644 index 2784314f0..000000000 --- a/crates/examples/src/utils.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::env; - -use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; - -fn get_catalog_uri_from_env() -> String { - env::var("CATALOG_URI").unwrap_or("http://localhost:8080".to_string()) -} - -pub fn get_rest_catalog() -> RestCatalog { - let config = RestCatalogConfig::builder() - .uri(get_catalog_uri_from_env()) - .build(); - RestCatalog::new(config) -}