From aff70da1147c75a6fb1e31dabf1c575b18d421ac Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 5 Aug 2024 16:03:08 +0000 Subject: [PATCH] test: add test for async writer --- integrations/parquet/Cargo.toml | 2 ++ integrations/parquet/src/async_writer.rs | 33 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 9ca14a7544f7..7c4e4fa89ce2 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -49,6 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [ ] } rand = "0.8.5" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } +arrow = { version = "52.0" } + [[example]] name = "async_writer" diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index adc8b53b31e2..027c9214c05d 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -104,8 +104,12 @@ impl AsyncFileWriter for AsyncWriter { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use arrow::array::{ArrayRef, Int64Array, RecordBatch}; use opendal::{services, Operator}; + use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; #[tokio::test] async fn test_basic() { @@ -136,4 +140,33 @@ mod tests { let exist = op.is_exist(path).await.unwrap(); assert!(!exist); } + + #[tokio::test] + async fn test_async_writer() { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); + } }