diff --git a/Cargo.lock b/Cargo.lock index cd5df85e12eb3..152ccb99abc1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6048,6 +6048,7 @@ dependencies = [ "futures-async-stream", "globset", "google-cloud-pubsub", + "hex", "http", "http-serde", "hyper", diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index eca7ec5814187..20e4474915661 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -843,6 +843,28 @@ impl ToText for crate::types::Interval { } } +impl Interval { + pub fn as_iso_8601(&self) -> String { + // ISO pattern - PnYnMnDTnHnMnS + let years = self.months / 12; + let months = self.months % 12; + let days = self.days; + let secs_fract = (self.usecs % USECS_PER_SEC).abs(); + let total_secs = (self.usecs / USECS_PER_SEC).abs(); + let hours = total_secs / 3600; + let minutes = (total_secs / 60) % 60; + let seconds = total_secs % 60; + let mut buf = [0u8; 7]; + let fract_str = if secs_fract != 0 { + write!(buf.as_mut_slice(), ".{:06}", secs_fract).unwrap(); + std::str::from_utf8(&buf).unwrap().trim_end_matches('0') + } else { + "" + }; + format!("P{years}Y{months}M{days}DT{hours}H{minutes}M{seconds}{fract_str}S") + } +} + impl Display for Interval { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let years = self.months / 12; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index afa0535ecaeb7..a4e9c04290a3f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,6 +34,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = "0.2" globset = "0.4.8" google-cloud-pubsub = "0.7.0" +hex = "0.4" http = "0.2" http-serde = "1.1.0" hyper = "0.14" diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 32c38d8bed6f1..36e3ba985e7ea 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -411,11 +411,40 @@ pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> fn fields_to_json(fields: &[Field]) -> Value { let mut res = Vec::new(); + fields.iter().for_each(|field| { + // mapping from 'https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-data-types' + let r#type = match field.data_type() { + risingwave_common::types::DataType::Boolean => "boolean", + risingwave_common::types::DataType::Int16 => "int16", + risingwave_common::types::DataType::Int32 => "int32", + risingwave_common::types::DataType::Int64 => "int64", + risingwave_common::types::DataType::Float32 => "float32", + risingwave_common::types::DataType::Float64 => "float64", + // currently, we only support handling decimal as string. + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types + risingwave_common::types::DataType::Decimal => "string", + + risingwave_common::types::DataType::Varchar => "string", + + risingwave_common::types::DataType::Date => "int32", + risingwave_common::types::DataType::Time => "int64", + risingwave_common::types::DataType::Timestamp => "int64", + risingwave_common::types::DataType::Timestamptz => "string", + risingwave_common::types::DataType::Interval => "string", + + risingwave_common::types::DataType::Bytea => "bytes", + risingwave_common::types::DataType::Jsonb => "string", + risingwave_common::types::DataType::Serial => "int32", + // since the original debezium pg support HSTORE via encoded as json string by default, + // we do the same here + risingwave_common::types::DataType::Struct(_) => "string", + risingwave_common::types::DataType::List { .. } => "string", + }; res.push(json!({ "field": field.name, "optional": true, - "type": field.type_name, + "type": r#type, })) }); @@ -654,7 +683,7 @@ mod test { let json_chunk = chunk_to_json(chunk, &schema).unwrap(); let schema_json = schema_to_json(&schema); - assert_eq!(schema_json.to_string(), "{\"fields\":[{\"field\":\"before\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"\"}],\"optional\":true,\"type\":\"struct\"},{\"field\":\"after\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"\"}],\"optional\":true,\"type\":\"struct\"}],\"optional\":false,\"type\":\"struct\"}"); + assert_eq!(schema_json.to_string(), "{\"fields\":[{\"field\":\"before\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float32\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"optional\":true,\"type\":\"struct\"},{\"field\":\"after\",\"fields\":[{\"field\":\"v1\",\"optional\":true,\"type\":\"int32\"},{\"field\":\"v2\",\"optional\":true,\"type\":\"float32\"},{\"field\":\"v3\",\"optional\":true,\"type\":\"string\"}],\"optional\":true,\"type\":\"struct\"}],\"optional\":false,\"type\":\"struct\"}"); assert_eq!( json_chunk[0].as_str(), "{\"v1\":0,\"v2\":0.0,\"v3\":{\"v4\":0,\"v5\":0.0}}" diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 11ecb5b67ac71..3552290c245d4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; +use chrono::{Datelike, Timelike}; use enum_as_inner::EnumAsInner; use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; @@ -306,19 +307,27 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { - // fixme json!(v.to_text()) } - ( - dt @ DataType::Date - | dt @ DataType::Time - | dt @ DataType::Timestamp - | dt @ DataType::Timestamptz - | dt @ DataType::Interval - | dt @ DataType::Bytea, - scalar, - ) => { - json!(scalar.to_text_with_type(&dt)) + (DataType::Timestamptz, ScalarRefImpl::Timestamp(v)) => { + json!(v.0.and_local_timezone(chrono::Utc).unwrap().to_rfc3339()) + } + (DataType::Time, ScalarRefImpl::Time(v)) => { + // todo: just ignore the nanos part to avoid leap second complex + json!(v.0.num_seconds_from_midnight() as i64 * 1000) + } + (DataType::Date, ScalarRefImpl::Date(v)) => { + json!(v.0.num_days_from_ce()) + } + (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => { + json!(v.0.timestamp_millis()) + } + (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { + json!(hex::encode(v)) + } + // PYMDTHMS + (DataType::Interval, ScalarRefImpl::Interval(v)) => { + json!(v.as_iso_8601()) } (DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => { let mut vec = Vec::with_capacity(list_ref.values_ref().len()); @@ -351,3 +360,112 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult