Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panics for out-of-range timestamps #481

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use chrono::{NaiveDateTime, TimeZone};
use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::compute::try_unary;
use vegafusion_common::arrow::error::ArrowError;
use vegafusion_common::datafusion_expr::ScalarUDFImpl;
use vegafusion_common::{
arrow::{
array::{ArrayRef, Date32Array, TimestampMillisecondArray},
compute::unary,
datatypes::{DataType, TimeUnit},
},
datafusion_common::{DataFusionError, ScalarValue},
Expand Down Expand Up @@ -73,7 +74,7 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF {
let s_per_day = 60 * 60 * 24_i64;
let date_array = date_array.as_any().downcast_ref::<Date32Array>().unwrap();

let timestamp_array: TimestampMillisecondArray = unary(date_array, |v| {
let timestamp_array: TimestampMillisecondArray = try_unary(date_array, |v| {
// Build naive datetime for time
let seconds = (v as i64) * s_per_day;
let nanoseconds = 0_u32;
Expand All @@ -84,11 +85,11 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF {
let local_datetime = tz
.from_local_datetime(&naive_local_datetime)
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?;

// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
});
Ok(local_datetime.timestamp_millis())
})?;
let timestamp_array = Arc::new(timestamp_array) as ArrayRef;

// maybe back to scalar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn parse_datetime(
dt
} else {
// Handle positive timezone transition by adding 1 hour
let datetime = datetime.with_hour(datetime.hour() + 1).unwrap();
let datetime = datetime.with_hour(datetime.hour() + 1)?;
local_tz.from_local_datetime(&datetime).earliest()?
};
let dt_utc = dt.with_timezone(&chrono::Utc);
Expand Down
113 changes: 79 additions & 34 deletions vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::array::{ArrayRef, Int64Array, TimestampMillisecondArray};
use vegafusion_common::arrow::compute::unary;
use vegafusion_common::arrow::compute::try_unary;
use vegafusion_common::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_common::arrow::error::ArrowError;
use vegafusion_common::arrow::temporal_conversions::date64_to_datetime;
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::{
Expand Down Expand Up @@ -66,36 +67,52 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
value: i64,
units_mask: &[bool],
in_tz: T,
) -> DateTime<T> {
) -> Result<DateTime<T>, ArrowError> {
// Load and interpret date time as UTC
let dt_value = date64_to_datetime(value)
.unwrap()
.with_nanosecond(0)
.unwrap();
let dt_value = Utc.from_local_datetime(&dt_value).earliest().unwrap();
.and_then(|d| d.with_nanosecond(0))
.ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?;

let dt_value =
Utc.from_local_datetime(&dt_value)
.earliest()
.ok_or(ArrowError::ComputeError(
"Failed to convert to UTC".to_string(),
))?;

let mut dt_value = dt_value.with_timezone(&in_tz);

// Handle time truncation
if !units_mask[7] {
// Clear hours first to avoid any of the other time truncations from landing on a daylight
// savings boundary
dt_value = dt_value.with_hour(0).unwrap();
dt_value = dt_value
.with_hour(0)
.ok_or(ArrowError::ComputeError("Failed to drop hours".to_string()))?;
}

if !units_mask[10] {
// Milliseconds
let new_ns = (((dt_value.nanosecond() as f64) / 1e6).floor() * 1e6) as u32;
dt_value = dt_value.with_nanosecond(new_ns).unwrap();
dt_value = dt_value
.with_nanosecond(new_ns)
.ok_or(ArrowError::ComputeError(
"Failed to set nanoseconds".to_string(),
))?;
}

if !units_mask[9] {
// Seconds
dt_value = dt_value.with_second(0).unwrap();
dt_value = dt_value.with_second(0).ok_or(ArrowError::ComputeError(
"Failed to set seconds".to_string(),
))?;
}

if !units_mask[8] {
// Minutes
dt_value = dt_value.with_minute(0).unwrap();
dt_value = dt_value.with_minute(0).ok_or(ArrowError::ComputeError(
"Failed to set minutes".to_string(),
))?;
}

// Save off day of the year and weekday here, because these will change if the
Expand All @@ -115,11 +132,11 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
let hour = dt_value.hour();
dt_value
.with_hour(0)
.unwrap()
.with_year(2012)
.unwrap()
.with_hour(hour + 1)
.unwrap()
.and_then(|dt| dt.with_year(2012))
.and_then(|dt| dt.with_hour(hour + 1))
.ok_or(ArrowError::ComputeError(
"Failed to handle daylight savings boundary".to_string(),
))?
}
}

Expand All @@ -131,35 +148,42 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
let new_month = ((dt_value.month0() as f64 / 3.0).floor() * 3.0) as u32;
dt_value = dt_value
.with_day0(0)
.unwrap()
.with_month0(new_month)
.unwrap();
.and_then(|dt| dt.with_month0(new_month))
.ok_or(ArrowError::ComputeError(
"Failed to truncate to quarter".to_string(),
))?;
} else if units_mask[2] {
// Month and not Date
// Truncate to first day of the month
if !units_mask[3] {
dt_value = dt_value.with_day0(0).unwrap();
dt_value = dt_value.with_day0(0).ok_or(ArrowError::ComputeError(
"Failed to truncate to first day of the month".to_string(),
))?;
}
} else if units_mask[3] {
// Date and not Month
// Normalize to January, keeping existing day of the month.
// (January has 31 days, so this is safe)
if !units_mask[2] {
dt_value = dt_value.with_month0(0).unwrap();
dt_value = dt_value.with_month0(0).ok_or(ArrowError::ComputeError(
"Failed to truncate to first day of the month".to_string(),
))?;
}
} else if units_mask[4] {
// Week
// Step 1: Find the date of the first Sunday in the same calendar year as the date.
// This may occur in isoweek 0, or in the final isoweek of the previous year

let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun)
.expect("invalid or out-of-range datetime");
let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun).ok_or(
ArrowError::ComputeError("invalid or out-of-range datetime".to_string()),
)?;

let isoweek0_sunday = NaiveDateTime::new(isoweek0_sunday, dt_value.time());
let isoweek0_sunday = in_tz
.from_local_datetime(&isoweek0_sunday)
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

// Subtract one week from isoweek0_sunday and check if it's still in the same calendar
// year
Expand Down Expand Up @@ -188,11 +212,15 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
// (which is January 1st)
let first_sunday_of_2012 = in_tz
.from_local_datetime(&NaiveDateTime::new(
NaiveDate::from_ymd_opt(2012, 1, 1).expect("invalid or out-of-range datetime"),
NaiveDate::from_ymd_opt(2012, 1, 1).ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?,
dt_value.time(),
))
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

dt_value = first_sunday_of_2012 + chrono::Duration::weeks(week_number);
} else {
Expand All @@ -207,20 +235,34 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
} else {
NaiveDate::from_isoywd_opt(dt_value.year(), 2, weekday)
}
.expect("invalid or out-of-range datetime");
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

let new_datetime = NaiveDateTime::new(new_date, dt_value.time());
dt_value = in_tz.from_local_datetime(&new_datetime).earliest().unwrap();
dt_value =
in_tz
.from_local_datetime(&new_datetime)
.earliest()
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
} else if units_mask[6] {
// DayOfYear
// Keep the same day of the year
dt_value = dt_value.with_ordinal0(ordinal0).unwrap();
dt_value = dt_value
.with_ordinal0(ordinal0)
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
} else {
// Clear month and date
dt_value = dt_value.with_ordinal0(0).unwrap();
dt_value = dt_value.with_ordinal0(0).ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
}

dt_value
Ok(dt_value)
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -296,9 +338,12 @@ impl ScalarUDFImpl for TimeunitStartUDF {
let (timestamp, tz, units_mask) = unpack_timeunit_udf_args(args)?;

let array = timestamp.as_any().downcast_ref::<Int64Array>().unwrap();
let result_array: TimestampMillisecondArray = unary(array, |value| {
perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz).timestamp_millis()
});
let result_array: TimestampMillisecondArray = try_unary(array, |value| {
Ok(
perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz)?
.timestamp_millis(),
)
})?;

Ok(ColumnarValue::Array(Arc::new(result_array) as ArrayRef))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ pub fn to_utc_timestamp(timestamp_array: ArrayRef, tz: Tz) -> Result<ArrayRef, D
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime =
naive_local_datetime.with_hour(hour + 1).unwrap();
let new_naive_local_datetime = naive_local_datetime.with_hour(hour + 1)?;
tz.from_local_datetime(&new_naive_local_datetime).earliest()
};

Expand Down
Loading