From 5a16857d5e69391dc201ffe486ce4e800cafad4e Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 27 Feb 2025 16:15:28 +0800 Subject: [PATCH 1/4] reproduce connection ref unparse bug with sqlparser tests --- src/sqlparser/tests/testdata/create.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 8390cc980cc25..e759f5a464293 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -138,6 +138,8 @@ formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') FORMAT PLAIN ENCODE JSON - input: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') format upsert encode protobuf (schema.location = 'location', message = 'main_message'); formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') FORMAT UPSERT ENCODE PROTOBUF (schema.location = 'location', message = 'main_message') +- input: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') format plain encode json; + formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = connection my_kafka_conn, topic = 'test_topic') FORMAT PLAIN ENCODE JSON - input: CREATE SINK snk into t FROM MV formatted_sql: CREATE SINK snk INTO t FROM MV - input: CREATE SINK snk into t AS SELECT * FROM t From 4e4efb15e712bf8ae1c6dbdb20c866861637f6d4 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 27 Feb 2025 16:30:36 +0800 Subject: [PATCH 2/4] unnecessary Debug --- src/sqlparser/src/ast/mod.rs | 14 +------------- src/sqlparser/tests/testdata/create.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index c66b4e4acb4ac..c5d41523f2dd6 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2822,7 +2822,7 @@ impl fmt::Display for SqlOption { } } -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum SqlOptionValue { Value(Value), @@ -2830,18 +2830,6 @@ pub enum SqlOptionValue { ConnectionRef(ConnectionRefValue), } -impl fmt::Debug for SqlOptionValue { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SqlOptionValue::Value(value) => write!(f, "{:?}", value), - SqlOptionValue::SecretRef(secret_ref) => write!(f, "secret {:?}", secret_ref), - SqlOptionValue::ConnectionRef(connection_ref) => { - write!(f, "connection {:?}", connection_ref) - } - } - } -} - impl fmt::Display for SqlOptionValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index e759f5a464293..a7b5e62088766 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -43,19 +43,19 @@ ^ - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: Value(SingleQuotedString("abc")) }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: Value(SingleQuotedString("localhost:1001")) }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: Value(SingleQuotedString("Foo")) }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: Value(SingleQuotedString("file://")) }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: Value(SingleQuotedString("abc")) }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: Value(SingleQuotedString("localhost:1001")) }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: Value(SingleQuotedString("Foo")) }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: Value(SingleQuotedString("http://")) }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: Some(0), constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: Some(0), constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: Value(SingleQuotedString("abc")) }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: Value(SingleQuotedString("localhost:1001")) }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: Value(SingleQuotedString("Foo")) }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: Value(SingleQuotedString("http://")) }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [Unique { name: None, columns: [Ident { value: "event_id", quote_style: None }], is_primary: true }], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [Unique { name: None, columns: [Ident { value: "event_id", quote_style: None }], is_primary: true }], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: Value(SingleQuotedString("abc")) }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: Value(SingleQuotedString("localhost:1001")) }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: Value(SingleQuotedString("Foo")) }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: Value(SingleQuotedString("http://")) }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), format_encode: V2(FormatEncodeOptions { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: Value(SingleQuotedString("nexmark")) }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: Value(SingleQuotedString("Bid")) }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: Value(SingleQuotedString("12")) }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: Value(SingleQuotedString("0")) }]), format_encode: V2(FormatEncodeOptions { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' - input: |- CREATE SOURCE s (raw BYTEA) From 50a42c4eaaa64218dcdf61ddfe4449ba78c5a285 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 27 Feb 2025 16:36:42 +0800 Subject: [PATCH 3/4] fix connection unparse --- src/sqlparser/src/ast/mod.rs | 2 +- src/sqlparser/tests/testdata/create.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index c5d41523f2dd6..54f0e7dbe15a0 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2836,7 +2836,7 @@ impl fmt::Display for SqlOptionValue { SqlOptionValue::Value(value) => write!(f, "{}", value), SqlOptionValue::SecretRef(secret_ref) => write!(f, "secret {}", secret_ref), SqlOptionValue::ConnectionRef(connection_ref) => { - write!(f, "connection {}", connection_ref) + write!(f, "{}", connection_ref) } } } diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index a7b5e62088766..ce5ff8d05f5e9 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -139,7 +139,7 @@ - input: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') format upsert encode protobuf (schema.location = 'location', message = 'main_message'); formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') FORMAT UPSERT ENCODE PROTOBUF (schema.location = 'location', message = 'main_message') - input: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') format plain encode json; - formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = connection my_kafka_conn, topic = 'test_topic') FORMAT PLAIN ENCODE JSON + formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') FORMAT PLAIN ENCODE JSON - input: CREATE SINK snk into t FROM MV formatted_sql: CREATE SINK snk INTO t FROM MV - input: CREATE SINK snk into t AS SELECT * FROM t From 9be04806ee64fffd76d8c818f0b4a992d17c919b Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Fri, 28 Feb 2025 12:32:43 +0800 Subject: [PATCH 4/4] tolerate existing persisted buggy syntax --- src/sqlparser/src/parser.rs | 7 +++++++ src/sqlparser/tests/testdata/create.yaml | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 2120d0fd76418..38c5765b29096 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3017,6 +3017,13 @@ impl Parser<'_> { const CONNECTION_REF_KEY: &str = "connection"; if name.real_value().eq_ignore_ascii_case(CONNECTION_REF_KEY) { let connection_name = self.parse_object_name()?; + // tolerate previous buggy Display that outputs `connection = connection foo` + let connection_name = match connection_name.0.as_slice() { + [ident] if ident.real_value() == CONNECTION_REF_KEY => { + self.parse_object_name()? + } + _ => connection_name, + }; SqlOptionValue::ConnectionRef(ConnectionRefValue { connection_name }) } else { self.parse_value_and_obj_ref::()? diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index ce5ff8d05f5e9..1e60294f649c6 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -140,6 +140,8 @@ formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') FORMAT UPSERT ENCODE PROTOBUF (schema.location = 'location', message = 'main_message') - input: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') format plain encode json; formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') FORMAT PLAIN ENCODE JSON +- input: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = connection my_kafka_conn, topic = 'test_topic') format plain encode json; + formatted_sql: CREATE SINK snk FROM mv WITH (connector = 'kafka', connection = my_kafka_conn, topic = 'test_topic') FORMAT PLAIN ENCODE JSON - input: CREATE SINK snk into t FROM MV formatted_sql: CREATE SINK snk INTO t FROM MV - input: CREATE SINK snk into t AS SELECT * FROM t