Skip to content
This repository has been archived by the owner on Feb 2, 2023. It is now read-only.

Commit

Permalink
feat: support schema file as command line arg (#71)
Browse files Browse the repository at this point in the history
Co-authored-by: Andy Redhead <[email protected]>
  • Loading branch information
andyredhead and AndyRedhead-EC authored May 16, 2022
1 parent d9f6803 commit da8eca6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
*.json
*.arrow
*.parquet
.idea/
59 changes: 52 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use parquet::{
errors::ParquetError,
file::properties::WriterProperties,
};
use serde_json::to_string_pretty;
use serde_json::{to_string_pretty, Value};
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Arc;

#[derive(clap::ArgEnum, Clone)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
Expand Down Expand Up @@ -45,6 +47,10 @@ struct Opts {
#[clap(name = "PARQUET", parse(from_os_str), value_hint = ValueHint::AnyPath)]
output: PathBuf,

/// File with Arrow schema in JSON format.
#[clap(short = 's', long, parse(from_os_str), value_hint = ValueHint::AnyPath)]
schema_file: Option<PathBuf>,

/// The number of records to infer the schema from. All rows if not present. Setting max-read-records to zero will stop schema inference and all columns will be string typed.
#[clap(long)]
max_read_records: Option<usize>,
Expand Down Expand Up @@ -101,21 +107,60 @@ struct Opts {
fn main() -> Result<(), ParquetError> {
let opts: Opts = Opts::parse();

let input = File::open(opts.input)?;
let builder = ReaderBuilder::new().infer_schema(opts.max_read_records);
let reader = builder.build(input)?;
let mut input = File::open(opts.input)?;

let schema = match opts.schema_file {
Some(schema_def_file_path) => {
let schema_file = match File::open(&schema_def_file_path) {
Ok(file) => Ok(file),
Err(error) => Err(ParquetError::General(format!(
"Error opening schema file: {:?}, message: {}",
schema_def_file_path, error
))),
}?;
let json: serde_json::Result<Value> = serde_json::from_reader(schema_file);
match json {
Ok(schema_json) => match arrow::datatypes::Schema::from(&schema_json) {
Ok(schema) => Ok(schema),
Err(error) => Err(error.into()),
},
Err(err) => Err(ParquetError::General(format!(
"Error reading schema json: {}",
err
))),
}
}
_ => {
let mut buf_reader = BufReader::new(&input);

match arrow::json::reader::infer_json_schema(&mut buf_reader, opts.max_read_records) {
Ok(schema) => {
input.seek(SeekFrom::Start(0))?;
Ok(schema)
}
Err(error) => Err(ParquetError::General(format!(
"Error inferring schema: {}",
error
))),
}
}
}?;

if opts.print_schema || opts.dry {
let json = to_string_pretty(&reader.schema().to_json()).unwrap();
eprintln!("Inferred Schema:\n{}", json);

let json = to_string_pretty(&schema.to_json()).unwrap();
eprintln!("Schema:");
println!("{}", json);
if opts.dry {
return Ok(());
}
}

let output = File::create(opts.output)?;

let schema_ref = Arc::new(schema);
let builder = ReaderBuilder::new().with_schema(schema_ref);
let reader = builder.build(input)?;

let mut props = WriterProperties::builder()
.set_dictionary_enabled(opts.dictionary)
.set_statistics_enabled(opts.statistics);
Expand Down

0 comments on commit da8eca6

Please sign in to comment.