Skip to content

Commit

Permalink
Merge pull request #28 from manojkarthick/schema-json
Browse files Browse the repository at this point in the history
Support JSON output for schema subcommand
  • Loading branch information
manojkarthick authored May 13, 2022
2 parents d33dca9 + 7e84e50 commit 4af84d1
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 46 deletions.
85 changes: 45 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ arrow = { version = "12.0.0", features = ["chrono-tz"] }
clap = "3.1.10"
rand = "0.8.5"
walkdir = "2.3.2"
serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"

[dev-dependencies]
assert_cmd = "2.0.4"
Expand Down
78 changes: 72 additions & 6 deletions src/commands/schema.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::collections::HashMap;
use std::io::BufWriter;
use crate::errors::PQRSError;
use crate::errors::PQRSError::FileNotFound;
use crate::utils::{check_path_present, open_file};
use clap::Parser;
use log::debug;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata, print_schema};
use std::path::PathBuf;
use parquet::file::metadata::ParquetMetaData;
use serde::{Serialize, Deserialize};

/// Prints the schema of Parquet file(s)
#[derive(Parser, Debug)]
Expand All @@ -15,10 +19,59 @@ pub struct SchemaCommandArgs {
#[clap(short = 'D', long)]
detailed: bool,

/// Print in JSON format
#[clap(short, long, conflicts_with = "detailed")]
json: bool,

/// Parquet files to read
files: Vec<PathBuf>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ParquetSchema {
version: i32,
num_rows: i64,
// num_columns: usize,
created_by: Option<String>,
metadata: Option<HashMap<String, Option<String>>>,
columns: Vec<HashMap<String, String>>,
message: String,
}

fn get_schema_metadata(metadata: &ParquetMetaData) -> Option<HashMap<String, Option<String>>> {
if let Some(metadata) = metadata.file_metadata().key_value_metadata() {
let mut fields : HashMap<String, Option<String>> = HashMap::new();
for kv in metadata.iter() {
fields.insert(kv.key.to_string(), kv.value.to_owned());
}
Some(fields)
} else {
None
}
}

fn get_column_information(metadata: &ParquetMetaData) -> Vec<HashMap<String, String>>{
let schema = metadata.file_metadata().schema_descr();
let mut columns = Vec::new();
for (_i, col) in schema.columns().iter().enumerate() {
let mut column_info: HashMap<String, String> = HashMap::new();
column_info.insert(String::from("name"), String::from(col.name()));
column_info.insert(String::from("path"), col.path().string());
column_info.insert(String::from("optional"), col.self_type().is_optional().to_string());
column_info.insert(String::from("physical_type"), col.physical_type().to_string());
column_info.insert(String::from("converted_type"), col.converted_type().to_string());
columns.push(column_info)
}
columns
}

fn get_message(metadata: &ParquetMetaData) -> Result<String, PQRSError> {
let mut buf = BufWriter::new(Vec::new());
print_schema(&mut buf, metadata.file_metadata().schema());
let bytes = buf.into_inner()?;
Ok(String::from_utf8(bytes)?)
}

pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> {
debug!("The file names to read are: {:?}", opts.files);
debug!("Print Detailed output: {}", opts.detailed);
Expand All @@ -36,12 +89,25 @@ pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> {
Err(e) => return Err(PQRSError::ParquetError(e)),
Ok(parquet_reader) => {
let metadata = parquet_reader.metadata();
println!("Metadata for file: {}", file_name.display());
println!();
if opts.detailed {
print_parquet_metadata(&mut std::io::stdout(), metadata);
if opts.json {
let schema = ParquetSchema {
version: metadata.file_metadata().version(),
num_rows: metadata.file_metadata().num_rows(),
created_by: metadata.file_metadata().created_by().clone(),
metadata: get_schema_metadata(metadata),
columns: get_column_information(metadata),
message: get_message(metadata)?,
};
let schema_json = serde_json::to_string(&schema)?;
println!("{}", schema_json);
} else {
print_file_metadata(&mut std::io::stdout(), metadata.file_metadata());
println!("Metadata for file: {}", file_name.display());
println!();
if opts.detailed {
print_parquet_metadata(&mut std::io::stdout(), metadata);
} else {
print_file_metadata(&mut std::io::stdout(), metadata.file_metadata());
}
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use arrow::error::ArrowError;
use parquet::errors::ParquetError;
use std::io;
use std::io::{BufWriter, IntoInnerError};
use std::num::ParseIntError;
use std::path::PathBuf;
use std::string::FromUtf8Error;
use thiserror::Error;
use serde_json::Error as SerdeJsonError;

#[allow(dead_code)]
#[derive(Error, Debug)]
Expand All @@ -24,4 +27,11 @@ pub enum PQRSError {
ArrowReadWriteError(#[from] ArrowError),
#[error("Unsupported operation")]
UnsupportedOperation(),
#[error("Could not convert to/from json")]
SerdeJsonError(#[from] SerdeJsonError),
#[error("Could not create string from UTF8 bytes")]
UTF8ConvertError(#[from] FromUtf8Error),
#[error("Could not read/write to buffer")]
BufferWriteError(#[from] IntoInnerError<BufWriter<Vec<u8>>>)

}
Loading

0 comments on commit 4af84d1

Please sign in to comment.