From 86d7c91013f5ebbf4d5196344c203bdaa4c3aaab Mon Sep 17 00:00:00 2001 From: Manoj Karthick Date: Fri, 13 May 2022 15:27:26 -0700 Subject: [PATCH] support json output for schema subcommand --- src/commands/schema.rs | 78 ++++++++++++++++++++++++++++++++++++++---- src/errors.rs | 10 ++++++ 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/src/commands/schema.rs b/src/commands/schema.rs index c643aa4..ecd7f59 100644 --- a/src/commands/schema.rs +++ b/src/commands/schema.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::io::BufWriter; use crate::errors::PQRSError; use crate::errors::PQRSError::FileNotFound; use crate::utils::{check_path_present, open_file}; @@ -5,8 +7,10 @@ use clap::Parser; use log::debug; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; -use parquet::schema::printer::{print_file_metadata, print_parquet_metadata}; +use parquet::schema::printer::{print_file_metadata, print_parquet_metadata, print_schema}; use std::path::PathBuf; +use parquet::file::metadata::ParquetMetaData; +use serde::{Serialize, Deserialize}; /// Prints the schema of Parquet file(s) #[derive(Parser, Debug)] @@ -15,10 +19,59 @@ pub struct SchemaCommandArgs { #[clap(short = 'D', long)] detailed: bool, + /// Print in JSON format + #[clap(short, long, conflicts_with = "detailed")] + json: bool, + /// Parquet files to read files: Vec, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ParquetSchema { + version: i32, + num_rows: i64, + // num_columns: usize, + created_by: Option, + metadata: Option>>, + columns: Vec>, + message: String, +} + +fn get_schema_metadata(metadata: &ParquetMetaData) -> Option>> { + if let Some(metadata) = metadata.file_metadata().key_value_metadata() { + let mut fields : HashMap> = HashMap::new(); + for kv in metadata.iter() { + fields.insert(kv.key.to_string(), kv.value.to_owned()); + } + Some(fields) + } else { + None + } +} + +fn get_column_information(metadata: &ParquetMetaData) -> Vec>{ + let schema = metadata.file_metadata().schema_descr(); + let mut columns = Vec::new(); + for (_i, col) in schema.columns().iter().enumerate() { + let mut column_info: HashMap = HashMap::new(); + column_info.insert(String::from("name"), String::from(col.name())); + column_info.insert(String::from("path"), col.path().string()); + column_info.insert(String::from("optional"), col.self_type().is_optional().to_string()); + column_info.insert(String::from("physical_type"), col.physical_type().to_string()); + column_info.insert(String::from("converted_type"), col.converted_type().to_string()); + columns.push(column_info) + } + columns +} + +fn get_message(metadata: &ParquetMetaData) -> Result { + let mut buf = BufWriter::new(Vec::new()); + print_schema(&mut buf, metadata.file_metadata().schema()); + let bytes = buf.into_inner()?; + Ok(String::from_utf8(bytes)?) +} + pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> { debug!("The file names to read are: {:?}", opts.files); debug!("Print Detailed output: {}", opts.detailed); @@ -36,12 +89,25 @@ pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> { Err(e) => return Err(PQRSError::ParquetError(e)), Ok(parquet_reader) => { let metadata = parquet_reader.metadata(); - println!("Metadata for file: {}", file_name.display()); - println!(); - if opts.detailed { - print_parquet_metadata(&mut std::io::stdout(), metadata); + if opts.json { + let schema = ParquetSchema { + version: metadata.file_metadata().version(), + num_rows: metadata.file_metadata().num_rows(), + created_by: metadata.file_metadata().created_by().clone(), + metadata: get_schema_metadata(metadata), + columns: get_column_information(metadata), + message: get_message(metadata)?, + }; + let schema_json = serde_json::to_string(&schema)?; + println!("{}", schema_json); } else { - print_file_metadata(&mut std::io::stdout(), metadata.file_metadata()); + println!("Metadata for file: {}", file_name.display()); + println!(); + if opts.detailed { + print_parquet_metadata(&mut std::io::stdout(), metadata); + } else { + print_file_metadata(&mut std::io::stdout(), metadata.file_metadata()); + } } } } diff --git a/src/errors.rs b/src/errors.rs index 6d8abc0..b5d4ac9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,9 +1,12 @@ use arrow::error::ArrowError; use parquet::errors::ParquetError; use std::io; +use std::io::{BufWriter, IntoInnerError}; use std::num::ParseIntError; use std::path::PathBuf; +use std::string::FromUtf8Error; use thiserror::Error; +use serde_json::Error as SerdeJsonError; #[allow(dead_code)] #[derive(Error, Debug)] @@ -24,4 +27,11 @@ pub enum PQRSError { ArrowReadWriteError(#[from] ArrowError), #[error("Unsupported operation")] UnsupportedOperation(), + #[error("Could not convert to/from json")] + SerdeJsonError(#[from] SerdeJsonError), + #[error("Could not create string from UTF8 bytes")] + UTF8ConvertError(#[from] FromUtf8Error), + #[error("Could not read/write to buffer")] + BufferWriteError(#[from] IntoInnerError>>) + }