diff --git a/e2e_test/batch/functions/session_timezone.slt b/e2e_test/batch/functions/session_timezone.slt index 736018c09f648..1e018f2980503 100644 --- a/e2e_test/batch/functions/session_timezone.slt +++ b/e2e_test/batch/functions/session_timezone.slt @@ -5,13 +5,13 @@ set timezone = "us/pacific"; query T select '2022-01-01'::date::timestamp with time zone; ---- -2022-01-01 08:00:00+00:00 +2022-01-01 00:00:00-08:00 # Cast timestamp to timestamptz query T select '2022-01-01 00:00:00'::timestamp::timestamp with time zone; ---- -2022-01-01 08:00:00+00:00 +2022-01-01 00:00:00-08:00 # Cast timestamptz to timestamp query T @@ -47,7 +47,7 @@ t query T select '2022-01-01 00:00:00'::timestamp with time zone; ---- -2022-01-01 08:00:00+00:00 +2022-01-01 00:00:00-08:00 # Cast timestamptz to varchar, should display with timezone information query T diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 809dbff3fc367..e272fae7e2d22 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -150,6 +150,7 @@ pub async fn handle_query( local_execute(session.clone(), query, query_snapshot).await?, column_types, format, + session.clone(), )), // Local mode do not support cancel tasks. QueryMode::Distributed => { @@ -157,6 +158,7 @@ pub async fn handle_query( distribute_execute(session.clone(), query, query_snapshot).await?, column_types, format, + session.clone(), )) } } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index b10c8afe54b63..86a309766107c 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use bytes::Bytes; @@ -28,6 +29,9 @@ use risingwave_common::catalog::{ColumnDesc, Field}; use risingwave_common::error::Result as RwResult; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl}; +use risingwave_expr::vector_op::timestamptz::timestamptz_to_string; + +use crate::session::SessionImpl; pin_project! { /// Wrapper struct that converts a stream of DataChunk to a stream of RowSet based on formatting @@ -44,17 +48,33 @@ pin_project! { chunk_stream: VS, column_types: Vec, format: bool, + session_data: StaticSessionData, } } + +// Static session data frozen at the time of the creation of the stream +struct StaticSessionData { + timezone: String, +} + impl DataChunkToRowSetAdapter where VS: Stream>, { - pub fn new(chunk_stream: VS, column_types: Vec, format: bool) -> Self { + pub fn new( + chunk_stream: VS, + column_types: Vec, + format: bool, + session: Arc, + ) -> Self { + let session_data = StaticSessionData { + timezone: session.config().get_timezone().into(), + }; Self { chunk_stream, column_types, format, + session_data, } } } @@ -72,7 +92,7 @@ where Poll::Ready(chunk) => match chunk { Some(chunk_result) => match chunk_result { Ok(chunk) => Poll::Ready(Some( - to_pg_rows(this.column_types, chunk, *this.format) + to_pg_rows(this.column_types, chunk, *this.format, this.session_data) .map_err(|err| err.into()), )), Err(err) => Poll::Ready(Some(Err(err))), @@ -84,17 +104,45 @@ where } /// Format scalars according to postgres convention. -fn pg_value_format(data_type: &DataType, d: ScalarRefImpl<'_>, format: bool) -> RwResult { +fn pg_value_format( + data_type: &DataType, + d: ScalarRefImpl<'_>, + format: bool, + session_data: &StaticSessionData, +) -> RwResult { // format == false means TEXT format // format == true means BINARY format if !format { - Ok(d.text_format(data_type).into()) + if *data_type == DataType::Timestamptz { + Ok(timestamptz_to_string_with_session_data(d, session_data)) + } else { + Ok(d.text_format(data_type).into()) + } } else { d.binary_format(data_type) } } -fn to_pg_rows(column_types: &[DataType], chunk: DataChunk, format: bool) -> RwResult> { +fn timestamptz_to_string_with_session_data( + d: ScalarRefImpl<'_>, + session_data: &StaticSessionData, +) -> Bytes { + let mut buf = String::new(); + match d { + ScalarRefImpl::<'_>::Int64(d) => { + timestamptz_to_string(d, &session_data.timezone, &mut buf).unwrap() + } + _ => unreachable!(), + }; + buf.into() +} + +fn to_pg_rows( + column_types: &[DataType], + chunk: DataChunk, + format: bool, + session_data: &StaticSessionData, +) -> RwResult> { chunk .rows() .map(|r| { @@ -102,7 +150,7 @@ fn to_pg_rows(column_types: &[DataType], chunk: DataChunk, format: bool) -> RwRe .iter() .zip_eq(column_types) .map(|(data, t)| match data { - Some(data) => Some(pg_value_format(t, data, format)).transpose(), + Some(data) => Some(pg_value_format(t, data, format, session_data)).transpose(), None => Ok(None), }) .try_collect()?; @@ -163,6 +211,9 @@ mod tests { 3 7 7.01 vvv 4 . . . ", ); + let static_session = StaticSessionData { + timezone: "UTC".into(), + }; let rows = to_pg_rows( &[ DataType::Int32, @@ -172,6 +223,7 @@ mod tests { ], chunk, false, + &static_session, ); let expected: Vec>> = vec![ vec![ @@ -201,8 +253,11 @@ mod tests { #[test] fn test_value_format() { use {DataType as T, ScalarRefImpl as S}; + let static_session = StaticSessionData { + timezone: "UTC".into(), + }; - let f = |t, d, f| pg_value_format(t, d, f).unwrap(); + let f = |t, d, f| pg_value_format(t, d, f, &static_session).unwrap(); assert_eq!(&f(&T::Float32, S::Float32(1_f32.into()), false), "1"); assert_eq!(&f(&T::Float32, S::Float32(f32::NAN.into()), false), "NaN"); assert_eq!(&f(&T::Float64, S::Float64(f64::NAN.into()), false), "NaN"); @@ -224,5 +279,9 @@ mod tests { ); assert_eq!(&f(&T::Boolean, S::Bool(true), false), "t"); assert_eq!(&f(&T::Boolean, S::Bool(false), false), "f"); + assert_eq!( + &f(&T::Timestamptz, S::Int64(-1), false), + "1969-12-31 23:59:59.999999+00:00" + ); } }