Skip to content

Commit

Permalink
test: add test for async writer
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 5, 2024
1 parent 45a1f1d commit aff70da
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
2 changes: 2 additions & 0 deletions integrations/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [
] }
rand = "0.8.5"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
arrow = { version = "52.0" }


[[example]]
name = "async_writer"
Expand Down
33 changes: 33 additions & 0 deletions integrations/parquet/src/async_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ impl AsyncFileWriter for AsyncWriter {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use opendal::{services, Operator};
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};

#[tokio::test]
async fn test_basic() {
Expand Down Expand Up @@ -136,4 +140,33 @@ mod tests {
let exist = op.is_exist(path).await.unwrap();
assert!(!exist);
}

#[tokio::test]
async fn test_async_writer() {
let operator = Operator::new(services::Memory::default()).unwrap().finish();
let path = "/path/to/file.parquet";

let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

let buffer = operator.read(path).await.unwrap().to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
}

0 comments on commit aff70da

Please sign in to comment.