diff --git a/arrow-cast/src/display.rs b/arrow-cast/src/display.rs index 16fbfb0bbce5..7214321127cf 100644 --- a/arrow-cast/src/display.rs +++ b/arrow-cast/src/display.rs @@ -28,6 +28,13 @@ use arrow_array::*; use arrow_buffer::ArrowNativeType; use arrow_schema::*; use chrono::prelude::SecondsFormat; +use chrono::{DateTime, Utc}; + +fn invalid_cast_error(dt: &str, col_idx: usize, row_idx: usize) -> ArrowError { + ArrowError::CastError(format!( + "Cannot cast to {dt} at col index: {col_idx} row index: {row_idx}" + )) +} macro_rules! make_string { ($array_type:ty, $column: ident, $row: ident) => {{ @@ -133,57 +140,176 @@ macro_rules! make_string_interval_month_day_nano { } macro_rules! make_string_date { - ($array_type:ty, $column: ident, $row: ident) => {{ - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + ($array_type:ty, $dt:expr, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .value_as_date($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .to_string()) + }}; +} - Ok(array - .value_as_date($row) - .map(|d| d.to_string()) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string())) +macro_rules! make_string_date_with_format { + ($array_type:ty, $dt:expr, $format: ident, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .value_as_datetime($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .format($format) + .to_string()) + }}; +} + +macro_rules! handle_string_date { + ($array_type:ty, $dt:expr, $format: ident, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + match $format { + Some(format) => { + make_string_date_with_format!( + $array_type, + $dt, + format, + $column, + $col_idx, + $row_idx + ) + } + None => make_string_date!($array_type, $dt, $column, $col_idx, $row_idx), + } }}; } macro_rules! make_string_time { - ($array_type:ty, $column: ident, $row: ident) => {{ - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + ($array_type:ty, $dt:expr, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .value_as_time($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .to_string()) + }}; +} - Ok(array - .value_as_time($row) - .map(|d| d.to_string()) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string())) +macro_rules! make_string_time_with_format { + ($array_type:ty, $dt:expr, $format: ident, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .value_as_time($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .format($format) + .to_string()) }}; } +macro_rules! handle_string_time { + ($array_type:ty, $dt:expr, $format: ident, $column: ident, $col_idx:ident, $row_idx: ident) => { + match $format { + Some(format) => { + make_string_time_with_format!( + $array_type, + $dt, + format, + $column, + $col_idx, + $row_idx + ) + } + None => make_string_time!($array_type, $dt, $column, $col_idx, $row_idx), + } + }; +} + macro_rules! make_string_datetime { - ($array_type:ty, $column: ident, $row: ident) => {{ - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + ($array_type:ty, $dt:expr, $tz_string: ident, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + let array = $column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))?; + + let s = match $tz_string { + Some(tz_string) => match tz_string.parse::() { + Ok(tz) => array + .value_as_datetime_with_tz($row_idx, tz) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .to_rfc3339_opts(SecondsFormat::AutoSi, true) + .to_string(), + Err(_) => { + let datetime = array + .value_as_datetime($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))?; + format!("{:?} (Unknown Time Zone '{}')", datetime, tz_string) + } + }, + None => { + let datetime = array + .value_as_datetime($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))?; + format!("{:?}", datetime) + } + }; - Ok(array - .value_as_datetime($row) - .map(|d| format!("{:?}", d)) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string())) + Ok(s) }}; } -macro_rules! make_string_datetime_with_tz { - ($array_type:ty, $tz_string: ident, $column: ident, $row: ident) => {{ - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - - let s = match $tz_string.parse::() { - Ok(tz) => array - .value_as_datetime_with_tz($row, tz) - .map(|d| format!("{}", d.to_rfc3339_opts(SecondsFormat::AutoSi, true))) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string()), - Err(_) => array - .value_as_datetime($row) - .map(|d| format!("{:?} (Unknown Time Zone '{}')", d, $tz_string)) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string()), +macro_rules! make_string_datetime_with_format { + ($array_type:ty, $dt:expr, $format: ident, $tz_string: ident, $column: ident, $col_idx:ident, $row_idx: ident) => {{ + let array = $column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))?; + let datetime = array + .value_as_datetime($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))?; + + let s = match $tz_string { + Some(tz_string) => match tz_string.parse::() { + Ok(tz) => { + let utc_time = DateTime::::from_utc(datetime, Utc); + let local_time = utc_time.with_timezone(&tz); + local_time.format($format).to_string() + } + Err(_) => { + format!("{:?} (Unknown Time Zone '{}')", datetime, tz_string) + } + }, + None => datetime.format($format).to_string(), }; Ok(s) }}; } +macro_rules! handle_string_datetime { + ($array_type:ty, $dt:expr, $format: ident, $tz_string: ident, $column: ident, $col_idx:ident, $row_idx: ident) => { + match $format { + Some(format) => make_string_datetime_with_format!( + $array_type, + $dt, + format, + $tz_string, + $column, + $col_idx, + $row_idx + ), + None => make_string_datetime!( + $array_type, + $dt, + $tz_string, + $column, + $col_idx, + $row_idx + ), + } + }; +} + // It's not possible to do array.value($row).to_string() for &[u8], let's format it as hex macro_rules! make_string_hex { ($array_type:ty, $column: ident, $row: ident) => {{ @@ -248,13 +374,14 @@ macro_rules! make_string_from_fixed_size_list { } macro_rules! make_string_from_duration { - ($array_type:ty, $column: ident, $row: ident) => {{ - let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - - Ok(array - .value_as_duration($row) - .map(|d| d.to_string()) - .unwrap_or_else(|| "ERROR CONVERTING DATE".to_string())) + ($array_type:ty, $dt:expr, $column:ident, $col_idx:ident, $row_idx: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .value_as_duration($row_idx) + .ok_or_else(|| invalid_cast_error($dt, $col_idx, $row_idx))? + .to_string()) }}; } @@ -323,126 +450,172 @@ fn append_map_field_string( /// /// Note this function is quite inefficient and is unlikely to be /// suitable for converting large arrays or record batches. -pub fn array_value_to_string( +fn array_value_to_string_internal( column: &ArrayRef, - row: usize, + col_idx: usize, + row_idx: usize, + format: Option<&str>, ) -> Result { - if column.is_null(row) { + if column.is_null(row_idx) { return Ok("".to_string()); } match column.data_type() { - DataType::Utf8 => make_string!(array::StringArray, column, row), - DataType::LargeUtf8 => make_string!(array::LargeStringArray, column, row), - DataType::Binary => make_string_hex!(array::BinaryArray, column, row), - DataType::LargeBinary => make_string_hex!(array::LargeBinaryArray, column, row), + DataType::Utf8 => make_string!(array::StringArray, column, row_idx), + DataType::LargeUtf8 => make_string!(array::LargeStringArray, column, row_idx), + DataType::Binary => make_string_hex!(array::BinaryArray, column, row_idx), + DataType::LargeBinary => { + make_string_hex!(array::LargeBinaryArray, column, row_idx) + } DataType::FixedSizeBinary(_) => { - make_string_hex!(array::FixedSizeBinaryArray, column, row) + make_string_hex!(array::FixedSizeBinaryArray, column, row_idx) } - DataType::Boolean => make_string!(array::BooleanArray, column, row), - DataType::Int8 => make_string!(array::Int8Array, column, row), - DataType::Int16 => make_string!(array::Int16Array, column, row), - DataType::Int32 => make_string!(array::Int32Array, column, row), - DataType::Int64 => make_string!(array::Int64Array, column, row), - DataType::UInt8 => make_string!(array::UInt8Array, column, row), - DataType::UInt16 => make_string!(array::UInt16Array, column, row), - DataType::UInt32 => make_string!(array::UInt32Array, column, row), - DataType::UInt64 => make_string!(array::UInt64Array, column, row), - DataType::Float16 => make_string!(array::Float16Array, column, row), - DataType::Float32 => make_string!(array::Float32Array, column, row), - DataType::Float64 => make_string!(array::Float64Array, column, row), - DataType::Decimal128(..) => make_string_from_decimal(column, row), + DataType::Boolean => make_string!(array::BooleanArray, column, row_idx), + DataType::Int8 => make_string!(array::Int8Array, column, row_idx), + DataType::Int16 => make_string!(array::Int16Array, column, row_idx), + DataType::Int32 => make_string!(array::Int32Array, column, row_idx), + DataType::Int64 => make_string!(array::Int64Array, column, row_idx), + DataType::UInt8 => make_string!(array::UInt8Array, column, row_idx), + DataType::UInt16 => make_string!(array::UInt16Array, column, row_idx), + DataType::UInt32 => make_string!(array::UInt32Array, column, row_idx), + DataType::UInt64 => make_string!(array::UInt64Array, column, row_idx), + DataType::Float16 => make_string!(array::Float16Array, column, row_idx), + DataType::Float32 => make_string!(array::Float32Array, column, row_idx), + DataType::Float64 => make_string!(array::Float64Array, column, row_idx), + DataType::Decimal128(..) => make_string_from_decimal(column, row_idx), DataType::Timestamp(unit, tz_string_opt) if *unit == TimeUnit::Second => { - match tz_string_opt { - Some(tz_string) => make_string_datetime_with_tz!( - array::TimestampSecondArray, - tz_string, - column, - row - ), - None => make_string_datetime!(array::TimestampSecondArray, column, row), - } + handle_string_datetime!( + array::TimestampSecondArray, + "Timestamp", + format, + tz_string_opt, + column, + col_idx, + row_idx + ) } DataType::Timestamp(unit, tz_string_opt) if *unit == TimeUnit::Millisecond => { - match tz_string_opt { - Some(tz_string) => make_string_datetime_with_tz!( - array::TimestampMillisecondArray, - tz_string, - column, - row - ), - None => { - make_string_datetime!(array::TimestampMillisecondArray, column, row) - } - } + handle_string_datetime!( + array::TimestampMillisecondArray, + "Timestamp", + format, + tz_string_opt, + column, + col_idx, + row_idx + ) } DataType::Timestamp(unit, tz_string_opt) if *unit == TimeUnit::Microsecond => { - match tz_string_opt { - Some(tz_string) => make_string_datetime_with_tz!( - array::TimestampMicrosecondArray, - tz_string, - column, - row - ), - None => { - make_string_datetime!(array::TimestampMicrosecondArray, column, row) - } - } + handle_string_datetime!( + array::TimestampMicrosecondArray, + "Timestamp", + format, + tz_string_opt, + column, + col_idx, + row_idx + ) } DataType::Timestamp(unit, tz_string_opt) if *unit == TimeUnit::Nanosecond => { - match tz_string_opt { - Some(tz_string) => make_string_datetime_with_tz!( - array::TimestampNanosecondArray, - tz_string, - column, - row - ), - None => { - make_string_datetime!(array::TimestampNanosecondArray, column, row) - } - } + handle_string_datetime!( + array::TimestampNanosecondArray, + "Timestamp", + format, + tz_string_opt, + column, + col_idx, + row_idx + ) + } + DataType::Date32 => { + handle_string_date!( + array::Date32Array, + "Date32", + format, + column, + col_idx, + row_idx + ) + } + DataType::Date64 => { + handle_string_date!( + array::Date64Array, + "Date64", + format, + column, + col_idx, + row_idx + ) } - DataType::Date32 => make_string_date!(array::Date32Array, column, row), - DataType::Date64 => make_string_date!(array::Date64Array, column, row), DataType::Time32(unit) if *unit == TimeUnit::Second => { - make_string_time!(array::Time32SecondArray, column, row) + handle_string_time!( + array::Time32SecondArray, + "Time32", + format, + column, + col_idx, + row_idx + ) } DataType::Time32(unit) if *unit == TimeUnit::Millisecond => { - make_string_time!(array::Time32MillisecondArray, column, row) + handle_string_time!( + array::Time32MillisecondArray, + "Time32", + format, + column, + col_idx, + row_idx + ) } DataType::Time64(unit) if *unit == TimeUnit::Microsecond => { - make_string_time!(array::Time64MicrosecondArray, column, row) + handle_string_time!( + array::Time64MicrosecondArray, + "Time64", + format, + column, + col_idx, + row_idx + ) } DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => { - make_string_time!(array::Time64NanosecondArray, column, row) + handle_string_time!( + array::Time64NanosecondArray, + "Time64", + format, + column, + col_idx, + row_idx + ) } DataType::Interval(unit) => match unit { IntervalUnit::DayTime => { - make_string_interval_day_time!(column, row) + make_string_interval_day_time!(column, row_idx) } IntervalUnit::YearMonth => { - make_string_interval_year_month!(column, row) + make_string_interval_year_month!(column, row_idx) } IntervalUnit::MonthDayNano => { - make_string_interval_month_day_nano!(column, row) + make_string_interval_month_day_nano!(column, row_idx) } }, - DataType::List(_) => make_string_from_list!(column, row), - DataType::LargeList(_) => make_string_from_large_list!(column, row), + DataType::List(_) => make_string_from_list!(column, row_idx), + DataType::LargeList(_) => make_string_from_large_list!(column, row_idx), DataType::Dictionary(index_type, _value_type) => match **index_type { - DataType::Int8 => dict_array_value_to_string::(column, row), - DataType::Int16 => dict_array_value_to_string::(column, row), - DataType::Int32 => dict_array_value_to_string::(column, row), - DataType::Int64 => dict_array_value_to_string::(column, row), - DataType::UInt8 => dict_array_value_to_string::(column, row), - DataType::UInt16 => dict_array_value_to_string::(column, row), - DataType::UInt32 => dict_array_value_to_string::(column, row), - DataType::UInt64 => dict_array_value_to_string::(column, row), + DataType::Int8 => dict_array_value_to_string::(column, row_idx), + DataType::Int16 => dict_array_value_to_string::(column, row_idx), + DataType::Int32 => dict_array_value_to_string::(column, row_idx), + DataType::Int64 => dict_array_value_to_string::(column, row_idx), + DataType::UInt8 => dict_array_value_to_string::(column, row_idx), + DataType::UInt16 => dict_array_value_to_string::(column, row_idx), + DataType::UInt32 => dict_array_value_to_string::(column, row_idx), + DataType::UInt64 => dict_array_value_to_string::(column, row_idx), _ => Err(ArrowError::InvalidArgumentError(format!( "Pretty printing not supported for {:?} due to index type", column.data_type() ))), }, - DataType::FixedSizeList(_, _) => make_string_from_fixed_size_list!(column, row), + DataType::FixedSizeList(_, _) => { + make_string_from_fixed_size_list!(column, row_idx) + } DataType::Struct(_) => { let st = column .as_any() @@ -458,11 +631,11 @@ pub fn array_value_to_string( s.push('{'); let mut kv_iter = st.columns().iter().zip(st.column_names()); if let Some((col, name)) = kv_iter.next() { - append_struct_field_string(&mut s, name, col, row)?; + append_struct_field_string(&mut s, name, col, row_idx)?; } for (col, name) in kv_iter { s.push_str(", "); - append_struct_field_string(&mut s, name, col, row)?; + append_struct_field_string(&mut s, name, col, row_idx)?; } s.push('}'); @@ -475,7 +648,7 @@ pub fn array_value_to_string( "Repl error: could not convert column to map array.".to_string(), ) })?; - let map_entry = map_array.value(row); + let map_entry = map_array.value(row_idx); let st = map_entry .as_any() .downcast_ref::() @@ -501,20 +674,44 @@ pub fn array_value_to_string( Ok(s) } DataType::Union(field_vec, type_ids, mode) => { - union_to_string(column, row, field_vec, type_ids, mode) + union_to_string(column, row_idx, field_vec, type_ids, mode) } DataType::Duration(unit) => match *unit { TimeUnit::Second => { - make_string_from_duration!(array::DurationSecondArray, column, row) + make_string_from_duration!( + array::DurationSecondArray, + "Duration", + column, + col_idx, + row_idx + ) } TimeUnit::Millisecond => { - make_string_from_duration!(array::DurationMillisecondArray, column, row) + make_string_from_duration!( + array::DurationMillisecondArray, + "Duration", + column, + col_idx, + row_idx + ) } TimeUnit::Microsecond => { - make_string_from_duration!(array::DurationMicrosecondArray, column, row) + make_string_from_duration!( + array::DurationMicrosecondArray, + "Duration", + column, + col_idx, + row_idx + ) } TimeUnit::Nanosecond => { - make_string_from_duration!(array::DurationNanosecondArray, column, row) + make_string_from_duration!( + array::DurationNanosecondArray, + "Duration", + column, + col_idx, + row_idx + ) } }, _ => Err(ArrowError::InvalidArgumentError(format!( @@ -524,6 +721,22 @@ pub fn array_value_to_string( } } +pub fn temporal_array_value_to_string( + column: &ArrayRef, + col_idx: usize, + row_idx: usize, + format: Option<&str>, +) -> Result { + array_value_to_string_internal(column, col_idx, row_idx, format) +} + +pub fn array_value_to_string( + column: &ArrayRef, + row_idx: usize, +) -> Result { + array_value_to_string_internal(column, 0, row_idx, None) +} + /// Converts the value of the union array at `row` to a String fn union_to_string( column: &ArrayRef, diff --git a/arrow-csv/src/writer.rs b/arrow-csv/src/writer.rs index bc11eef2fcf1..94620be6629f 100644 --- a/arrow-csv/src/writer.rs +++ b/arrow-csv/src/writer.rs @@ -63,12 +63,12 @@ //! } //! ``` -use arrow_array::timezone::Tz; use arrow_array::types::*; use arrow_array::*; -use arrow_cast::display::{lexical_to_string, make_string_from_decimal}; +use arrow_cast::display::{ + array_value_to_string, lexical_to_string, temporal_array_value_to_string, +}; use arrow_schema::*; -use chrono::{DateTime, Utc}; use std::io::Write; use crate::map_csv_error; @@ -88,25 +88,6 @@ where lexical_to_string(c.value(i)) } -fn invalid_cast_error(dt: &str, col_index: usize, row_index: usize) -> ArrowError { - ArrowError::CastError(format!( - "Cannot cast to {dt} at col index: {col_index} row index: {row_index}" - )) -} - -macro_rules! write_temporal_value { - ($array:expr, $tpe: ident, $format: expr, $col_index: expr, $row_index: expr, $cast_func: ident, $tpe_name: expr) => {{ - $array - .as_any() - .downcast_ref::<$tpe>() - .ok_or_else(|| invalid_cast_error($tpe_name, $col_index, $row_index))? - .$cast_func($row_index) - .ok_or_else(|| invalid_cast_error($tpe_name, $col_index, $row_index))? - .format($format) - .to_string() - }}; -} - /// A CSV writer #[derive(Debug)] pub struct Writer { @@ -115,17 +96,17 @@ pub struct Writer { /// Whether file should be written with headers. Defaults to `true` has_headers: bool, /// The date format for date arrays - date_format: String, + date_format: Option, /// The datetime format for datetime arrays - datetime_format: String, + datetime_format: Option, /// The timestamp format for timestamp arrays #[allow(dead_code)] - timestamp_format: String, + timestamp_format: Option, /// The timestamp format for timestamp (with timezone) arrays #[allow(dead_code)] - timestamp_tz_format: String, + timestamp_tz_format: Option, /// The time format for time arrays - time_format: String, + time_format: Option, /// Is the beginning-of-writer beginning: bool, /// The value to represent null entries @@ -141,11 +122,11 @@ impl Writer { Writer { writer, has_headers: true, - date_format: DEFAULT_DATE_FORMAT.to_string(), - datetime_format: DEFAULT_TIMESTAMP_FORMAT.to_string(), - time_format: DEFAULT_TIME_FORMAT.to_string(), - timestamp_format: DEFAULT_TIMESTAMP_FORMAT.to_string(), - timestamp_tz_format: DEFAULT_TIMESTAMP_TZ_FORMAT.to_string(), + date_format: Some(DEFAULT_DATE_FORMAT.to_string()), + datetime_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()), + time_format: Some(DEFAULT_TIME_FORMAT.to_string()), + timestamp_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()), + timestamp_tz_format: Some(DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()), beginning: true, null_value: DEFAULT_NULL_VALUE.to_string(), } @@ -177,88 +158,74 @@ impl Writer { DataType::UInt16 => write_primitive_value::(col, row_index), DataType::UInt32 => write_primitive_value::(col, row_index), DataType::UInt64 => write_primitive_value::(col, row_index), - DataType::Boolean => { - let c = col.as_any().downcast_ref::().unwrap(); - c.value(row_index).to_string() - } - DataType::Utf8 => { - let c = col.as_any().downcast_ref::().unwrap(); - c.value(row_index).to_owned() - } - DataType::LargeUtf8 => { - let c = col.as_any().downcast_ref::().unwrap(); - c.value(row_index).to_owned() - } - DataType::Date32 => { - write_temporal_value!( - col, - Date32Array, - &self.date_format, - col_index, - row_index, - value_as_date, - "Date32" - ) - } - DataType::Date64 => { - write_temporal_value!( + DataType::Boolean => array_value_to_string(col, row_index)?.to_string(), + DataType::Utf8 => array_value_to_string(col, row_index)?.to_string(), + DataType::LargeUtf8 => array_value_to_string(col, row_index)?.to_string(), + DataType::Date32 => temporal_array_value_to_string( + col, + col_index, + row_index, + self.date_format.as_deref(), + )? + .to_string(), + DataType::Date64 => temporal_array_value_to_string( + col, + col_index, + row_index, + self.datetime_format.as_deref(), + )? + .to_string(), + DataType::Time32(TimeUnit::Second) => temporal_array_value_to_string( + col, + col_index, + row_index, + self.time_format.as_deref(), + )? + .to_string(), + DataType::Time32(TimeUnit::Millisecond) => { + temporal_array_value_to_string( col, - Date64Array, - &self.datetime_format, col_index, row_index, - value_as_datetime, - "Date64" - ) + self.time_format.as_deref(), + )? + .to_string() } - DataType::Time32(TimeUnit::Second) => { - write_temporal_value!( + DataType::Time64(TimeUnit::Microsecond) => { + temporal_array_value_to_string( col, - Time32SecondArray, - &self.time_format, col_index, row_index, - value_as_time, - "Time32" - ) + self.time_format.as_deref(), + )? + .to_string() } - DataType::Time32(TimeUnit::Millisecond) => { - write_temporal_value!( + DataType::Time64(TimeUnit::Nanosecond) => temporal_array_value_to_string( + col, + col_index, + row_index, + self.time_format.as_deref(), + )? + .to_string(), + DataType::Timestamp(_, time_zone) => match time_zone { + Some(_tz) => temporal_array_value_to_string( col, - Time32MillisecondArray, - &self.time_format, col_index, row_index, - value_as_time, - "Time32" - ) - } - DataType::Time64(TimeUnit::Microsecond) => { - write_temporal_value!( + self.timestamp_tz_format.as_deref(), + )? + .to_string(), + None => temporal_array_value_to_string( col, - Time64MicrosecondArray, - &self.time_format, col_index, row_index, - value_as_time, - "Time64" - ) + self.timestamp_format.as_deref(), + )? + .to_string(), + }, + DataType::Decimal128(..) => { + array_value_to_string(col, row_index)?.to_string() } - DataType::Time64(TimeUnit::Nanosecond) => { - write_temporal_value!( - col, - Time64NanosecondArray, - &self.time_format, - col_index, - row_index, - value_as_time, - "Time64" - ) - } - DataType::Timestamp(time_unit, time_zone) => { - self.handle_timestamp(time_unit, time_zone.as_ref(), row_index, col)? - } - DataType::Decimal128(..) => make_string_from_decimal(col, row_index)?, t => { // List and Struct arrays not supported by the writer, any // other type needs to be implemented @@ -272,52 +239,6 @@ impl Writer { Ok(()) } - fn handle_timestamp( - &self, - time_unit: &TimeUnit, - time_zone: Option<&String>, - row_index: usize, - col: &ArrayRef, - ) -> Result { - use TimeUnit::*; - let datetime = match time_unit { - Second => col - .as_any() - .downcast_ref::() - .unwrap() - .value_as_datetime(row_index) - .unwrap(), - Millisecond => col - .as_any() - .downcast_ref::() - .unwrap() - .value_as_datetime(row_index) - .unwrap(), - Microsecond => col - .as_any() - .downcast_ref::() - .unwrap() - .value_as_datetime(row_index) - .unwrap(), - Nanosecond => col - .as_any() - .downcast_ref::() - .unwrap() - .value_as_datetime(row_index) - .unwrap(), - }; - - let tz: Option = time_zone.map(|x| x.parse()).transpose()?; - match tz { - Some(tz) => { - let utc_time = DateTime::::from_utc(datetime, Utc); - let local_time = utc_time.with_timezone(&tz); - Ok(local_time.format(&self.timestamp_tz_format).to_string()) - } - None => Ok(datetime.format(&self.timestamp_format).to_string()), - } - } - /// Write a vector of record batches to a writable object pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { let num_columns = batch.num_columns(); @@ -463,6 +384,19 @@ impl WriterBuilder { self } + /// Use RFC3339 format for date/time/timestamps by clearing all + /// date/time specific formats. + pub fn with_rfc3339(mut self, use_rfc3339: bool) -> Self { + if use_rfc3339 { + self.date_format = None; + self.datetime_format = None; + self.time_format = None; + self.timestamp_format = None; + self.timestamp_tz_format = None; + } + self + } + /// Create a new `Writer` pub fn build(self, writer: W) -> Writer { let delimiter = self.delimiter.unwrap_or(b','); @@ -471,21 +405,11 @@ impl WriterBuilder { Writer { writer, has_headers: self.has_headers, - date_format: self - .date_format - .unwrap_or_else(|| DEFAULT_DATE_FORMAT.to_string()), - datetime_format: self - .datetime_format - .unwrap_or_else(|| DEFAULT_TIMESTAMP_FORMAT.to_string()), - time_format: self - .time_format - .unwrap_or_else(|| DEFAULT_TIME_FORMAT.to_string()), - timestamp_format: self - .timestamp_format - .unwrap_or_else(|| DEFAULT_TIMESTAMP_FORMAT.to_string()), - timestamp_tz_format: self - .timestamp_tz_format - .unwrap_or_else(|| DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()), + date_format: self.date_format, + datetime_format: self.datetime_format, + time_format: self.time_format, + timestamp_format: self.timestamp_format, + timestamp_tz_format: self.timestamp_tz_format, beginning: true, null_value: self .null_value @@ -502,6 +426,12 @@ mod tests { use std::io::{Cursor, Read, Seek}; use std::sync::Arc; + fn invalid_cast_error(dt: &str, col_idx: usize, row_idx: usize) -> ArrowError { + ArrowError::CastError(format!( + "Cannot cast to {dt} at col index: {col_idx} row index: {row_idx}" + )) + } + #[test] fn test_write_csv() { let schema = Schema::new(vec![ @@ -722,6 +652,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo let mut file = tempfile::tempfile().unwrap(); let mut writer = Writer::new(&mut file); let batches = vec![&batch, &batch]; + for batch in batches { writer .write(batch) @@ -735,4 +666,57 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo } drop(writer); } + + #[test] + fn test_write_csv_using_rfc3339() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".to_string())), + true, + ), + Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("c3", DataType::Date32, false), + Field::new("c4", DataType::Time32(TimeUnit::Second), false), + ]); + + let c1 = TimestampMillisecondArray::from(vec![ + Some(1555584887378), + Some(1635577147000), + ]) + .with_timezone("+00:00".to_string()); + let c2 = TimestampMillisecondArray::from(vec![ + Some(1555584887378), + Some(1635577147000), + ]); + let c3 = Date32Array::from(vec![3, 2]); + let c4 = Time32SecondArray::from(vec![1234, 24680]); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)], + ) + .unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + let builder = WriterBuilder::new().with_rfc3339(true); + let mut writer = builder.build(&mut file); + let batches = vec![&batch]; + for batch in batches { + writer.write(batch).unwrap(); + } + drop(writer); + + file.rewind().unwrap(); + let mut buffer: Vec = vec![]; + file.read_to_end(&mut buffer).unwrap(); + + assert_eq!( + "c1,c2,c3,c4 +2019-04-18T10:54:47.378Z,2019-04-18T10:54:47.378,1970-01-04,00:20:34 +2021-10-30T06:59:07Z,2021-10-30T06:59:07,1970-01-03,06:51:20\n", + String::from_utf8(buffer).unwrap() + ); + } } diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs index 9d241aed3d28..fa7db4b862e9 100644 --- a/arrow-json/src/writer.rs +++ b/arrow-json/src/writer.rs @@ -105,7 +105,7 @@ use arrow_array::types::*; use arrow_array::*; use arrow_schema::*; -use arrow_cast::display::array_value_to_string; +use arrow_cast::display::temporal_array_value_to_string; fn primitive_array_to_json(array: &ArrayRef) -> Result, ArrowError> where @@ -137,6 +137,7 @@ fn struct_array_to_jsonmap_array( row_count, struct_col, inner_col_names[j], + j, )? } Ok(inner_objs) @@ -217,7 +218,7 @@ macro_rules! set_column_by_array_type { } macro_rules! set_temporal_column_by_array_type { - ($array_type:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident, $cast_fn:ident) => { + ($col_name:ident, $col_idx:ident, $rows:ident, $array:ident, $row_count:ident) => { $rows .iter_mut() .enumerate() @@ -226,7 +227,10 @@ macro_rules! set_temporal_column_by_array_type { if !$array.is_null(i) { row.insert( $col_name.to_string(), - array_value_to_string($array, i).unwrap().to_string().into(), + temporal_array_value_to_string($array, $col_idx, i, None) + .unwrap() + .to_string() + .into(), ); } }); @@ -260,6 +264,7 @@ fn set_column_for_json_rows( row_count: usize, array: &ArrayRef, col_name: &str, + col_idx: usize, ) -> Result<(), ArrowError> { match array.data_type() { DataType::Int8 => { @@ -311,144 +316,46 @@ fn set_column_for_json_rows( ); } DataType::Date32 => { - set_temporal_column_by_array_type!( - Date32Array, - col_name, - rows, - array, - row_count, - value_as_date - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Date64 => { - set_temporal_column_by_array_type!( - Date64Array, - col_name, - rows, - array, - row_count, - value_as_date - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Timestamp(TimeUnit::Second, _) => { - set_temporal_column_by_array_type!( - TimestampSecondArray, - col_name, - rows, - array, - row_count, - value_as_datetime - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Timestamp(TimeUnit::Millisecond, _) => { - set_temporal_column_by_array_type!( - TimestampMillisecondArray, - col_name, - rows, - array, - row_count, - value_as_datetime - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Timestamp(TimeUnit::Microsecond, _) => { - set_temporal_column_by_array_type!( - TimestampMicrosecondArray, - col_name, - rows, - array, - row_count, - value_as_datetime - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Timestamp(TimeUnit::Nanosecond, _) => { - set_temporal_column_by_array_type!( - TimestampNanosecondArray, - col_name, - rows, - array, - row_count, - value_as_datetime - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Time32(TimeUnit::Second) => { - set_temporal_column_by_array_type!( - Time32SecondArray, - col_name, - rows, - array, - row_count, - value_as_time - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Time32(TimeUnit::Millisecond) => { - set_temporal_column_by_array_type!( - Time32MillisecondArray, - col_name, - rows, - array, - row_count, - value_as_time - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Time64(TimeUnit::Microsecond) => { - set_temporal_column_by_array_type!( - Time64MicrosecondArray, - col_name, - rows, - array, - row_count, - value_as_time - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Time64(TimeUnit::Nanosecond) => { - set_temporal_column_by_array_type!( - Time64NanosecondArray, - col_name, - rows, - array, - row_count, - value_as_time - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Duration(TimeUnit::Second) => { - set_temporal_column_by_array_type!( - DurationSecondArray, - col_name, - rows, - array, - row_count, - value_as_duration - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Duration(TimeUnit::Millisecond) => { - set_temporal_column_by_array_type!( - DurationMillisecondArray, - col_name, - rows, - array, - row_count, - value_as_duration - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Duration(TimeUnit::Microsecond) => { - set_temporal_column_by_array_type!( - DurationMicrosecondArray, - col_name, - rows, - array, - row_count, - value_as_duration - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Duration(TimeUnit::Nanosecond) => { - set_temporal_column_by_array_type!( - DurationNanosecondArray, - col_name, - rows, - array, - row_count, - value_as_duration - ); + set_temporal_column_by_array_type!(col_name, col_idx, rows, array, row_count); } DataType::Struct(_) => { let inner_objs = @@ -492,7 +399,7 @@ fn set_column_for_json_rows( let slice = array.slice(0, row_count); let hydrated = arrow_cast::cast::cast(&slice, value_type) .expect("cannot cast dictionary to underlying values"); - set_column_for_json_rows(rows, row_count, &hydrated, col_name)?; + set_column_for_json_rows(rows, row_count, &hydrated, col_name, col_idx)?; } DataType::Map(_, _) => { let maparr = as_map_array(array); @@ -558,7 +465,7 @@ pub fn record_batches_to_json_rows( let row_count = batch.num_rows(); for (j, col) in batch.columns().iter().enumerate() { let col_name = schema.field(j).name(); - set_column_for_json_rows(&mut rows[base..], row_count, col, col_name)? + set_column_for_json_rows(&mut rows[base..], row_count, col, col_name, j)? } base += row_count; } diff --git a/arrow/tests/csv.rs b/arrow/tests/csv.rs index 83a279ce4794..5a7c7e962a11 100644 --- a/arrow/tests/csv.rs +++ b/arrow/tests/csv.rs @@ -62,3 +62,48 @@ fn test_export_csv_timestamps() { let right = String::from_utf8(sw).unwrap(); assert_eq!(left, right); } + +#[test] +fn test_export_csv_timestamps_using_rfc3339() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Timestamp( + TimeUnit::Millisecond, + Some("Australia/Sydney".to_string()), + ), + true, + ), + Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true), + ]); + + let c1 = TimestampMillisecondArray::from( + // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST). + // The offset (difference to UTC) is +10:00. + // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT) + // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30. + // + vec![Some(1555584887378), Some(1635577147000)], + ) + .with_timezone("Australia/Sydney".to_string()); + let c2 = + TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + + let mut sw = Vec::new(); + let mut writer = arrow_csv::WriterBuilder::new() + .with_rfc3339(true) + .build(&mut sw); + let batches = vec![&batch]; + for batch in batches { + writer.write(batch).unwrap(); + } + drop(writer); + + let left = "c1,c2 +2019-04-18T20:54:47.378+10:00,2019-04-18T10:54:47.378 +2021-10-30T17:59:07+11:00,2021-10-30T06:59:07\n"; + let right = String::from_utf8(sw).unwrap(); + assert_eq!(left, right); +}