Skip to content

Commit

Permalink
Add --numeric-handling option to allow writing numeric as float or st…
Browse files Browse the repository at this point in the history
…ring

resolves #10
  • Loading branch information
exyi committed Sep 15, 2023
1 parent b477d06 commit 478d899
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 52 deletions.
48 changes: 26 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ You can also use environment variables `$PGPASSWORD` and `$PGUSER`
* **Basic SQL types**: `text`, `char`, `varchar` and friends, all kinds of `int`s, `bool`, floating point numbers, `timestamp`, `timestamptz`, `date`, `time`, `uuid`
* `interval` - interval has lower precision in Parquet (ms) than in Postgres (µs), so the conversion is lossy. There is an option `--interval-handling=struct` which serializes it differently without rounding.
* **Decimal numeric types**
* `numeric` will have fixed precision according to the `--decimal-scale` and `--decimal-precision` parameters
* `numeric` will have fixed precision according to the `--decimal-scale` and `--decimal-precision` parameters. Alternatively use `--numeric-handling` to write a float or string instead.
* `money` is always a 64-bit decimal with 2 decimal places
* **`json` and `jsonb`**: by default serialized as a text field with the JSON. `--json-handling` option allows setting parquet LogicalType to [JSON](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#json), but the feature is not widely supported, thus it's disabled by default.
* **`xml`**: serialized as text
Expand Down Expand Up @@ -105,6 +105,9 @@ Options:
[possible values: none, snappy, gzip, lzo, brotli, lz4, zstd]
--compression-level <COMPRESSION_LEVEL>
Compression level of the output file compressor. Only relevant for zstd, brotli and gzip. Default: 3
-H, --host <HOST>
Database server host
Expand All @@ -124,12 +127,9 @@ Options:
Controls whether to use SSL/TLS to connect to the server
Possible values:
- disable:
Do not use TLS
- prefer:
Attempt to connect with TLS but allow sessions without (default behavior compiled with SSL support)
- require:
Require the use of TLS
- disable: Do not use TLS
- prefer: Attempt to connect with TLS but allow sessions without (default behavior compiled with SSL support)
- require: Require the use of TLS
--macaddr-handling <MACADDR_HANDLING>
How to handle `macaddr` columns
Expand All @@ -147,42 +147,46 @@ Options:
[default: text]
Possible values:
- text-marked-as-json:
JSON is stored as a Parquet JSON type. This is essentially the same as text, but with a different ConvertedType, so it may not be supported in all tools
- text:
JSON is stored as a UTF8 text
- text-marked-as-json: JSON is stored as a Parquet JSON type. This is essentially the same as text, but with a different ConvertedType, so it may not be supported in all tools
- text: JSON is stored as a UTF8 text
--enum-handling <ENUM_HANDLING>
How to handle enum (Enumerated Type) columns
[default: text]
Possible values:
- text:
Enum is stored as the postgres enum name, Parquet LogicalType is set to ENUM
- plain-text:
Enum is stored as the postgres enum name, Parquet LogicalType is set to String
- int:
Enum is stored as an 32-bit integer (one-based index of the value in the enum definition)
- text: Enum is stored as the postgres enum name, Parquet LogicalType is set to ENUM
- plain-text: Enum is stored as the postgres enum name, Parquet LogicalType is set to String
- int: Enum is stored as an 32-bit integer (one-based index of the value in the enum definition)
--interval-handling <INTERVAL_HANDLING>
How to handle `interval` columns
[default: interval]
Possible values:
- interval:
Enum is stored as the Parquet INTERVAL type. This has lower precision than postgres interval (milliseconds instead of microseconds)
- struct:
Enum is stored as struct { months: i32, days: i32, microseconds: i64 }, exactly as PostgreSQL stores it
- interval: Enum is stored as the Parquet INTERVAL type. This has lower precision than postgres interval (milliseconds instead of microseconds)
- struct: Enum is stored as struct { months: i32, days: i32, microseconds: i64 }, exactly as PostgreSQL stores it
--numeric-handling <NUMERIC_HANDLING>
How to handle `numeric` columns
[default: decimal]
Possible values:
- decimal: Numeric is stored using the DECIMAL parquet type. Use --decimal-precision and --decimal-scale to set the desired precision and scale
- double: Numeric is converted to float64 (DOUBLE)
- float32: Numeric is converted to float32 (FLOAT)
- string: Convert the numeric to a string and store it as UTF8 text. This option never looses precision. Note that text "NaN" may be present if NaN is present in the database
--decimal-scale <DECIMAL_SCALE>
How many decimal digits after the decimal point are stored in the Parquet file
[default: 18]
--decimal-precision <DECIMAL_PRECISION>
How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits
How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY
[default: 38]
Expand Down
16 changes: 15 additions & 1 deletion cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono = "0.4.26"
eui48 = "1.1.0"
bit-vec = "0.6.3"
pg_bigdecimal = "0.1.5"
bigdecimal = "0.4.1"
bytes = "1.4.0"
postgres-protocol = "0.6.5"
byteorder = "1.4.3"
Expand Down
20 changes: 20 additions & 0 deletions cli/src/datatypes/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;

use parquet::data_type::{ByteArray, ByteArrayType};
use pg_bigdecimal::{PgNumeric, BigDecimal, BigInt};
use bigdecimal::ToPrimitive;

use crate::appenders::{GenericColumnAppender, ColumnAppender, ColumnAppenderBase, DynamicSerializedWriter, new_autoconv_generic_appender, PreprocessExt, PreprocessAppender, UnwrapOptionAppender};
use crate::level_index::LevelIndexList;
Expand Down Expand Up @@ -77,6 +78,25 @@ impl<TInner: ColumnAppender<Vec<u8>>> ColumnAppender<PgNumeric> for DecimalBytes
}
}

impl MyFrom<PgNumeric> for f64 {
fn my_from(t: PgNumeric) -> Self {
match t.n {
Some(n) => n.to_string().parse().unwrap(), // for some reason the to_f64 method works poorly (looses more precision)
// Some(n) => n.to_f64().unwrap(),
None => f64::NAN,
}
}
}
impl MyFrom<PgNumeric> for f32 {
fn my_from(t: PgNumeric) -> Self {
match t.n {
Some(n) => n.to_string().parse().unwrap(),
// Some(n) => n.to_f32().unwrap(),
None => f32::NAN,
}
}
}

// #[derive(Clone)]
// struct DecimalIntAppender<TInt: TryFrom<BigInt>, TInner: ColumnAppender<i64>>
// where TInt::Error: std::fmt::Display {
Expand Down
8 changes: 6 additions & 2 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, path::PathBuf, process};

use clap::{Parser, ValueEnum, Command};
use parquet::basic::{ZstdLevel, BrotliLevel, GzipLevel};
use postgres_cloner::{SchemaSettingsMacaddrHandling, SchemaSettingsJsonHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling};
use postgres_cloner::{SchemaSettingsMacaddrHandling, SchemaSettingsJsonHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling, SchemaSettingsNumericHandling};

mod postgresutils;
mod myfrom;
Expand Down Expand Up @@ -115,10 +115,13 @@ pub struct SchemaSettingsArgs {
/// How to handle `interval` columns
#[arg(long, hide_short_help = true, default_value = "interval")]
interval_handling: SchemaSettingsIntervalHandling,
/// How to handle `numeric` columns
#[arg(long, hide_short_help = true, default_value = "decimal")]
numeric_handling: SchemaSettingsNumericHandling,
/// How many decimal digits after the decimal point are stored in the Parquet file
#[arg(long, hide_short_help = true, default_value_t = 18)]
decimal_scale: i32,
/// How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits.
/// How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY.
#[arg(long, hide_short_help = true, default_value_t = 38)]
decimal_precision: u32,
}
Expand Down Expand Up @@ -207,6 +210,7 @@ fn perform_export(args: ExportArgs) {
json_handling: args.schema_settings.json_handling,
enum_handling: args.schema_settings.enum_handling,
interval_handling: args.schema_settings.interval_handling,
numeric_handling: args.schema_settings.numeric_handling,
decimal_scale: args.schema_settings.decimal_scale,
decimal_precision: args.schema_settings.decimal_precision,
};
Expand Down
84 changes: 59 additions & 25 deletions cli/src/postgres_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use parquet::data_type::{DataType, BoolType, Int32Type, Int64Type, FloatType, Do
use parquet::file::properties::WriterPropertiesPtr;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::TimestampType;
use pg_bigdecimal::PgNumeric;
use postgres::error::SqlState;
use postgres::types::{Kind, Type as PgType, FromSql};
use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls};
Expand All @@ -38,6 +39,7 @@ pub struct SchemaSettings {
pub json_handling: SchemaSettingsJsonHandling,
pub enum_handling: SchemaSettingsEnumHandling,
pub interval_handling: SchemaSettingsIntervalHandling,
pub numeric_handling: SchemaSettingsNumericHandling,
pub decimal_scale: i32,
pub decimal_precision: u32,
}
Expand Down Expand Up @@ -78,12 +80,26 @@ pub enum SchemaSettingsIntervalHandling {
Struct
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
pub enum SchemaSettingsNumericHandling {
/// Numeric is stored using the DECIMAL parquet type. Use --decimal-precision and --decimal-scale to set the desired precision and scale.
Decimal,
/// Numeric is converted to float64 (DOUBLE).
#[clap(alias="float", alias="float64")]
Double,
/// Numeric is converted to float32 (FLOAT).
Float32,
/// Convert the numeric to a string and store it as UTF8 text. This option never looses precision. Note that text "NaN" may be present if NaN is present in the database.
String
}

pub fn default_settings() -> SchemaSettings {
SchemaSettings {
macaddr_handling: SchemaSettingsMacaddrHandling::Text,
json_handling: SchemaSettingsJsonHandling::Text, // DuckDB doesn't load JSON converted type, so better to use string I guess
enum_handling: SchemaSettingsEnumHandling::Text,
interval_handling: SchemaSettingsIntervalHandling::Interval,
numeric_handling: SchemaSettingsNumericHandling::Decimal,
decimal_scale: 18,
decimal_precision: 38,
}
Expand Down Expand Up @@ -388,31 +404,7 @@ fn map_simple_type<TRow: PgAbstractRow + Clone + 'static>(
"float4" => resolve_primitive::<f32, FloatType, _>(name, c, None, None),
"float8" => resolve_primitive::<f64, DoubleType, _>(name, c, None, None),
"numeric" => {
let scale = s.decimal_scale;
let precision = s.decimal_precision;
let pq_type = if precision <= 9 {
basic::Type::INT32
} else if precision <= 18 {
basic::Type::INT64
} else {
basic::Type::BYTE_ARRAY
};
let schema = ParquetType::primitive_type_builder(name, pq_type)
.with_logical_type(Some(LogicalType::Decimal { scale, precision: precision as i32 }))
.with_precision(precision as i32)
.with_scale(scale)
.build().unwrap();
let cp: DynColumnAppender<TRow> = if pq_type == basic::Type::INT32 {
let appender = new_decimal_int_appender::<i32, Int32Type>(c.definition_level + 1, c.repetition_level, precision, scale);
Box::new(wrap_pg_row_reader(c, appender))
} else if pq_type == basic::Type::INT64 {
let appender = new_decimal_int_appender::<i64, Int64Type>(c.definition_level + 1, c.repetition_level, precision, scale);
Box::new(wrap_pg_row_reader(c, appender))
} else {
let appender = new_decimal_bytes_appender(c.definition_level + 1, c.repetition_level, s.decimal_precision, s.decimal_scale);
Box::new(wrap_pg_row_reader(c, appender))
};
(cp, schema)
resolve_numeric(s, name, c)?
},
"money" => resolve_primitive::<PgMoney, Int64Type, _>(name, c, Some(LogicalType::Decimal { scale: 2, precision: 18 }), None),
"char" => resolve_primitive::<i8, Int32Type, _>(name, c, Some(LogicalType::Integer { bit_width: 8, is_signed: false }), None),
Expand Down Expand Up @@ -479,6 +471,48 @@ fn map_simple_type<TRow: PgAbstractRow + Clone + 'static>(
})
}

fn resolve_numeric<TRow: PgAbstractRow + Clone + 'static>(s: &SchemaSettings, name: &str, c: &ColumnInfo) -> Result<ResolvedColumn<TRow>, String> {
match s.numeric_handling {
SchemaSettingsNumericHandling::Decimal => {
let scale = s.decimal_scale;
let precision = s.decimal_precision;
let pq_type = if precision <= 9 {
basic::Type::INT32
} else if precision <= 18 {
basic::Type::INT64
} else {
basic::Type::BYTE_ARRAY
};
let schema = ParquetType::primitive_type_builder(name, pq_type)
.with_logical_type(Some(LogicalType::Decimal { scale, precision: precision as i32 }))
.with_precision(precision as i32)
.with_scale(scale)
.build().unwrap();
let cp: DynColumnAppender<TRow> = if pq_type == basic::Type::INT32 {
let appender = new_decimal_int_appender::<i32, Int32Type>(c.definition_level + 1, c.repetition_level, precision, scale);
Box::new(wrap_pg_row_reader(c, appender))
} else if pq_type == basic::Type::INT64 {
let appender = new_decimal_int_appender::<i64, Int64Type>(c.definition_level + 1, c.repetition_level, precision, scale);
Box::new(wrap_pg_row_reader(c, appender))
} else {
let appender = new_decimal_bytes_appender(c.definition_level + 1, c.repetition_level, s.decimal_precision, s.decimal_scale);
Box::new(wrap_pg_row_reader(c, appender))
};
Ok((cp, schema))
},

SchemaSettingsNumericHandling::Double =>
Ok(resolve_primitive::<PgNumeric, DoubleType, _>(name, c, None, None)),
SchemaSettingsNumericHandling::Float32 =>
Ok(resolve_primitive::<PgNumeric, FloatType, _>(name, c, None, None)),
SchemaSettingsNumericHandling::String =>
Ok(resolve_primitive_conv::<PgNumeric, ByteArrayType, _, _>(name, c, None, Some(LogicalType::String), None, |v: PgNumeric| match v.n {
Some(n) => ByteArray::my_from(n.to_string()),
None => ByteArray::my_from("NaN".to_string())
}))
}
}

fn resolve_primitive<T: for<'a> FromSql<'a> + Clone + 'static, TDataType, TRow: PgAbstractRow + Clone + 'static>(
name: &str,
c: &ColumnInfo,
Expand Down
45 changes: 43 additions & 2 deletions py-tests/test_basic_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_numeric_i32(self):
duckdb_table = duckdb.read_parquet(file).fetchall()
self.assertEqual(duckdb_table, [
(1, Decimal('1000.000100000000000000'), Decimal('1.000000000000000000')),
(2, None, None ) # parquet doesn't support NaN, so NULL it is
(2, None, None )
])

def test_numeric_i64(self):
Expand All @@ -151,7 +151,48 @@ def test_numeric_i64(self):
duckdb_table = duckdb.read_parquet(file).fetchall()
self.assertEqual(duckdb_table, [
(1, Decimal('1000.000100000000000000'), Decimal('1.000000000000000000')),
(2, None, None ) # parquet doesn't support NaN, so NULL it is
(2, None, None )
])
def test_numeric_f64(self):
file = wrappers.create_and_export(
"numeric_types", "id",
"id int, normal numeric(10, 5), high_precision numeric(140, 100)",
"(1, 1000.0001, 1.00000000000000000000000000000000000000000001), (2, 'NaN', 'NaN')",
options=["--numeric-handling=double"]
)
duckdb_table = duckdb.read_parquet(file).fetchall()
self.assertEqual(duckdb_table[0], (1, 1000.0001, 1))
self.assertTrue(math.isnan(duckdb_table[1][1]))
self.assertTrue(math.isnan(duckdb_table[1][2]))
schema = pl.read_parquet(file).schema
self.assertEqual(schema["normal"], pl.Float64)
self.assertEqual(schema["high_precision"], pl.Float64)

def test_numeric_f32(self):
file = wrappers.create_and_export(
"numeric_types", "id",
"id int, normal numeric(10, 5), high_precision numeric(140, 100)",
"(1, 1000.0001, 1.00000000000000000000000000000000000000000001), (2, 'NaN', 'NaN')",
options=["--numeric-handling=float32"]
)
duckdb_table = duckdb.read_parquet(file).fetchall()
self.assertEqual(duckdb_table[0], (1, 1000.0001220703125, 1))
self.assertTrue(math.isnan(duckdb_table[1][1]))
self.assertTrue(math.isnan(duckdb_table[1][2]))
schema = pl.read_parquet(file).schema
self.assertEqual(schema["normal"], pl.Float32)
self.assertEqual(schema["high_precision"], pl.Float32)
def test_numeric_string(self):
file = wrappers.create_and_export(
"numeric_types", "id",
"id int, normal numeric(10, 5), high_precision numeric(140, 100)",
"(1, 1000.0001, 1.00000000000000000000000000000000000000000001), (2, 'NaN', 'NaN')",
options=["--numeric-handling=string"]
)
duckdb_table = duckdb.read_parquet(file).fetchall()
self.assertEqual(duckdb_table, [
(1, '1000.00010', '1.0000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000'),
(2, 'NaN', 'NaN' )
])

def test_bytes(self):
Expand Down

0 comments on commit 478d899

Please sign in to comment.