Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: bump deps clap/parquet/arrow #41

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,282 changes: 982 additions & 300 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ categories = ["command-line-utilities"]
thiserror = "1.0.30"
log = "0.4.16"
env_logger = "0.9.0"
parquet = { version = "12.0.0", features = ["cli"] }
arrow = { version = "12.0.0", features = ["chrono-tz"] }
clap = "3.1.10"
parquet = { version = "38.0.0", features = ["cli"] }
arrow = { version = "38.0.0", features = ["chrono-tz"] }
clap = { version = "4.2.7", features = ["derive"]}
rand = "0.8.5"
walkdir = "2.3.2"
serde = { version = "1.0", features = ["derive"]}
Expand Down
6 changes: 3 additions & 3 deletions src/commands/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use walkdir::WalkDir;
#[derive(Parser, Debug)]
pub struct CatCommandArgs {
/// Use CSV format for printing
#[clap(short, long, conflicts_with = "json")]
#[arg(short, long, conflicts_with = "json")]
csv: bool,

/// Use CSV format without a header for printing
#[clap(long = "no-header", requires = "csv", conflicts_with = "json")]
#[arg(long = "no-header", requires = "csv", conflicts_with = "json")]
csv_no_header: bool,

/// Use JSON lines format for printing
#[clap(short, long, conflicts_with = "csv")]
#[arg(short, long, conflicts_with = "csv")]
json: bool,

/// Parquet files or folders to read from
Expand Down
6 changes: 3 additions & 3 deletions src/commands/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use std::path::PathBuf;
#[derive(Parser, Debug)]
pub struct HeadCommandArgs {
/// Use CSV format for printing
#[clap(short, long, conflicts_with = "json")]
#[arg(short, long, conflicts_with = "json")]
csv: bool,

/// Use JSON lines format for printing
#[clap(short, long, conflicts_with = "csv")]
#[arg(short, long, conflicts_with = "csv")]
json: bool,

/// The number of records to show (default: 5)
#[clap(short = 'n', long, default_value = "5")]
#[arg(short = 'n', long, default_value = "5")]
records: usize,

/// Parquet file to read
Expand Down
4 changes: 2 additions & 2 deletions src/commands/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use std::path::PathBuf;
#[derive(Parser, Debug)]
pub struct MergeCommandArgs {
/// Parquet files to read
#[clap(short, long, value_delimiter = ' ', multiple_values = true)]
#[arg(short, long, value_delimiter = ' ', num_args = 1..)]
input: Vec<PathBuf>,

/// Parquet file to write
#[clap(short, long)]
#[arg(short, long)]
output: PathBuf,
}

Expand Down
4 changes: 2 additions & 2 deletions src/commands/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use std::path::PathBuf;
#[derive(Parser, Debug)]
pub struct SampleCommandArgs {
/// Use JSON lines format for printing
#[clap(short, long)]
#[arg(short, long)]
json: bool,

/// The number of records to sample
#[clap(short = 'n', long)]
#[arg(short = 'n', long)]
records: usize,

/// Parquet file to read
Expand Down
49 changes: 34 additions & 15 deletions src/commands/schema.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::collections::HashMap;
use std::io::BufWriter;
use crate::errors::PQRSError;
use crate::errors::PQRSError::FileNotFound;
use crate::utils::{check_path_present, open_file};
use clap::Parser;
use log::debug;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata, print_schema};
use parquet::schema::printer::{
print_file_metadata, print_parquet_metadata, print_schema,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::BufWriter;
use std::path::PathBuf;
use parquet::file::metadata::ParquetMetaData;
use serde::{Serialize, Deserialize};

/// Prints the schema of Parquet file(s)
#[derive(Parser, Debug)]
pub struct SchemaCommandArgs {
/// Enable printing full file metadata
#[clap(short = 'D', long)]
#[arg(short = 'D', long)]
detailed: bool,

/// Print in JSON format
#[clap(short, long, conflicts_with = "detailed")]
#[arg(short, long, conflicts_with = "detailed")]
json: bool,

/// Parquet files to read
Expand All @@ -38,9 +40,11 @@ pub struct ParquetSchema {
message: String,
}

fn get_schema_metadata(metadata: &ParquetMetaData) -> Option<HashMap<String, Option<String>>> {
fn get_schema_metadata(
metadata: &ParquetMetaData,
) -> Option<HashMap<String, Option<String>>> {
if let Some(metadata) = metadata.file_metadata().key_value_metadata() {
let mut fields : HashMap<String, Option<String>> = HashMap::new();
let mut fields: HashMap<String, Option<String>> = HashMap::new();
for kv in metadata.iter() {
fields.insert(kv.key.to_string(), kv.value.to_owned());
}
Expand All @@ -50,16 +54,25 @@ fn get_schema_metadata(metadata: &ParquetMetaData) -> Option<HashMap<String, Opt
}
}

fn get_column_information(metadata: &ParquetMetaData) -> Vec<HashMap<String, String>>{
fn get_column_information(metadata: &ParquetMetaData) -> Vec<HashMap<String, String>> {
let schema = metadata.file_metadata().schema_descr();
let mut columns = Vec::new();
for (_i, col) in schema.columns().iter().enumerate() {
let mut column_info: HashMap<String, String> = HashMap::new();
column_info.insert(String::from("name"), String::from(col.name()));
column_info.insert(String::from("path"), col.path().string());
column_info.insert(String::from("optional"), col.self_type().is_optional().to_string());
column_info.insert(String::from("physical_type"), col.physical_type().to_string());
column_info.insert(String::from("converted_type"), col.converted_type().to_string());
column_info.insert(
String::from("optional"),
col.self_type().is_optional().to_string(),
);
column_info.insert(
String::from("physical_type"),
col.physical_type().to_string(),
);
column_info.insert(
String::from("converted_type"),
col.converted_type().to_string(),
);
columns.push(column_info)
}
columns
Expand Down Expand Up @@ -93,7 +106,10 @@ pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> {
let schema = ParquetSchema {
version: metadata.file_metadata().version(),
num_rows: metadata.file_metadata().num_rows(),
created_by: metadata.file_metadata().created_by().clone(),
created_by: metadata
.file_metadata()
.created_by()
.map(|str| str.to_string()),
metadata: get_schema_metadata(metadata),
columns: get_column_information(metadata),
message: get_message(metadata)?,
Expand All @@ -106,7 +122,10 @@ pub(crate) fn execute(opts: SchemaCommandArgs) -> Result<(), PQRSError> {
if opts.detailed {
print_parquet_metadata(&mut std::io::stdout(), metadata);
} else {
print_file_metadata(&mut std::io::stdout(), metadata.file_metadata());
print_file_metadata(
&mut std::io::stdout(),
metadata.file_metadata(),
);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/commands/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use std::path::PathBuf;
#[derive(Parser, Debug)]
pub struct SizeCommandArgs {
/// Show pretty, human readable size
#[clap(short, long)]
#[arg(short, long)]
pretty: bool,

/// Show compressed size
#[clap(short, long)]
#[arg(short, long)]
compressed: bool,

/// Parquet files to read
Expand Down
5 changes: 2 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use arrow::error::ArrowError;
use parquet::errors::ParquetError;
use serde_json::Error as SerdeJsonError;
use std::io;
use std::io::{BufWriter, IntoInnerError};
use std::num::ParseIntError;
use std::path::PathBuf;
use std::string::FromUtf8Error;
use thiserror::Error;
use serde_json::Error as SerdeJsonError;

#[allow(dead_code)]
#[derive(Error, Debug)]
Expand All @@ -32,6 +32,5 @@ pub enum PQRSError {
#[error("Could not create string from UTF8 bytes")]
UTF8ConvertError(#[from] FromUtf8Error),
#[error("Could not read/write to buffer")]
BufferWriteError(#[from] IntoInnerError<BufWriter<Vec<u8>>>)

BufferWriteError(#[from] IntoInnerError<BufWriter<Vec<u8>>>),
}
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ enum Commands {
Cat(commands::cat::CatCommandArgs),
Head(commands::head::HeadCommandArgs),
Merge(commands::merge::MergeCommandArgs),
#[clap(alias = "rowcount")]
#[command(alias = "rowcount")]
RowCount(commands::rowcount::RowCountCommandArgs),
Sample(commands::sample::SampleCommandArgs),
Schema(commands::schema::SchemaCommandArgs),
Size(commands::size::SizeCommandArgs),
}

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Show debug output
#[clap(short, long)]
#[arg(short, long)]
debug: bool,

#[clap(subcommand)]
#[command(subcommand)]
command: Commands,
}

Expand Down
29 changes: 13 additions & 16 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::errors::PQRSError;
use crate::errors::PQRSError::CouldNotOpenFile;
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use log::debug;
use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn check_path_present<P: AsRef<Path>>(file_path: P) -> bool {
pub fn open_file<P: AsRef<Path>>(file_name: P) -> Result<File, PQRSError> {
let file_name = file_name.as_ref();
let path = Path::new(file_name);
let file = match File::open(&path) {
let file = match File::open(path) {
Err(_) => return Err(CouldNotOpenFile(file_name.to_path_buf())),
Ok(f) => f,
};
Expand All @@ -68,12 +68,11 @@ pub fn print_rows(
num_records: Option<usize>,
format: Formats,
) -> Result<(), PQRSError> {
let parquet_reader = Arc::new(SerializedFileReader::new(file)?);

let mut left = num_records;

match format {
Formats::Default => {
let parquet_reader = SerializedFileReader::new(file)?;
let mut iter = parquet_reader.get_row_iter(None)?;

let mut start: usize = 0;
Expand All @@ -90,8 +89,8 @@ pub fn print_rows(
}
}
Formats::Json => {
let mut arrow_reader = ParquetFileArrowReader::new(parquet_reader);
let batch_reader = arrow_reader.get_record_reader(8192)?;
let arrow_reader = ArrowReaderBuilder::try_new(file)?;
let batch_reader = arrow_reader.with_batch_size(8192).build()?;
let mut writer = arrow::json::LineDelimitedWriter::new(std::io::stdout());

for maybe_batch in batch_reader {
Expand All @@ -116,8 +115,8 @@ pub fn print_rows(
writer.finish()?;
}
Formats::Csv => {
let mut arrow_reader = ParquetFileArrowReader::new(parquet_reader);
let batch_reader = arrow_reader.get_record_reader(8192)?;
let arrow_reader = ArrowReaderBuilder::try_new(file)?;
let batch_reader = arrow_reader.with_batch_size(8192).build()?;
let mut writer = arrow::csv::Writer::new(std::io::stdout());

for maybe_batch in batch_reader {
Expand All @@ -140,8 +139,8 @@ pub fn print_rows(
}
}
Formats::CsvNoHeader => {
let mut arrow_reader = ParquetFileArrowReader::new(parquet_reader);
let batch_reader = arrow_reader.get_record_reader(8192)?;
let arrow_reader = ArrowReaderBuilder::try_new(file)?;
let batch_reader = arrow_reader.with_batch_size(8192).build()?;
let writer_builder = arrow::csv::WriterBuilder::new().has_headers(false);
let mut writer = writer_builder.build(std::io::stdout());

Expand Down Expand Up @@ -235,12 +234,10 @@ impl Add for ParquetData {

/// Return the row batches, rows and schema for a given parquet file
pub fn get_row_batches(file: File) -> Result<ParquetData, PQRSError> {
// let file = open_file(input)?;
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let arrow_reader = ArrowReaderBuilder::try_new(file)?;

let schema = arrow_reader.get_schema()?;
let record_batch_reader = arrow_reader.get_record_reader(1024)?;
let schema = Schema::clone(arrow_reader.schema());
let record_batch_reader = arrow_reader.with_batch_size(1024).build()?;
let mut batches: Vec<RecordBatch> = Vec::new();

let mut rows = 0;
Expand Down Expand Up @@ -351,5 +348,5 @@ pub fn get_pretty_size(bytes: i64) -> String {
return format!("{:.3} TiB", bytes / ONE_TI_B);
}

return format!("{:.3} PiB", bytes / ONE_PI_B);
format!("{:.3} PiB", bytes / ONE_PI_B)
}
6 changes: 3 additions & 3 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ static CAT_OUTPUT: &str = r#"{continent: "Europe", country: {name: "France", cit
{continent: "Europe", country: {name: "Greece", city: ["Athens", "Piraeus", "Hania", "Heraklion", "Rethymnon", "Fira"]}}
{continent: "North America", country: {name: "Canada", city: ["Toronto", "Vancouver", "St. John's", "Saint John", "Montreal", "Halifax", "Winnipeg", "Calgary", "Saskatoon", "Ottawa", "Yellowknife"]}}
"#;
static CAT_JSON_OUTPUT: &str = r#"{"continent":"Europe","country":{"name":"France","city":["Paris","Nice","Marseilles","Cannes"]}}
{"continent":"Europe","country":{"name":"Greece","city":["Athens","Piraeus","Hania","Heraklion","Rethymnon","Fira"]}}
{"continent":"North America","country":{"name":"Canada","city":["Toronto","Vancouver","St. John's","Saint John","Montreal","Halifax","Winnipeg","Calgary","Saskatoon","Ottawa","Yellowknife"]}}
static CAT_JSON_OUTPUT: &str = r#"{"continent":"Europe","country":{"city":["Paris","Nice","Marseilles","Cannes"],"name":"France"}}
{"continent":"Europe","country":{"city":["Athens","Piraeus","Hania","Heraklion","Rethymnon","Fira"],"name":"Greece"}}
{"continent":"North America","country":{"city":["Toronto","Vancouver","St. John's","Saint John","Montreal","Halifax","Winnipeg","Calgary","Saskatoon","Ottawa","Yellowknife"],"name":"Canada"}}
"#;
static CAT_CSV_OUTPUT: &str = r#"foo,bar
1,2
Expand Down