Skip to content

Commit b20846e

Browse files
authored
add support for unary and binary values in values list, update docs (#1172)
* add support for unary and binary values in values list * add explain * fix clippy
1 parent 09748a2 commit b20846e

File tree

6 files changed

+178
-78
lines changed

6 files changed

+178
-78
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
161161
- [x] Common math functions
162162
- [x] cast
163163
- [x] try_cast
164+
- [x] [`VALUES` lists](https://www.postgresql.org/docs/current/queries-values.html)
164165
- Postgres compatible String functions
165166
- [x] ascii
166167
- [x] bit_length
@@ -193,7 +194,7 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
193194
- Miscellaneous/Boolean functions
194195
- [x] nullif
195196
- Approximation functions
196-
- [ ] approx_distinct
197+
- [x] approx_distinct
197198
- Common date/time functions
198199
- [ ] Basic date functions
199200
- [ ] Basic time functions

datafusion/src/physical_plan/values.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::physical_plan::{
2424
Partitioning, PhysicalExpr,
2525
};
2626
use crate::scalar::ScalarValue;
27+
use arrow::array::new_null_array;
2728
use arrow::datatypes::SchemaRef;
2829
use arrow::record_batch::RecordBatch;
2930
use async_trait::async_trait;
@@ -48,20 +49,33 @@ impl ValuesExec {
4849
if data.is_empty() {
4950
return Err(DataFusionError::Plan("Values list cannot be empty".into()));
5051
}
51-
// we have this empty batch as a placeholder to satisfy evaluation argument
52-
let batch = RecordBatch::new_empty(schema.clone());
5352
let n_row = data.len();
5453
let n_col = schema.fields().len();
54+
// we have this single row, null, typed batch as a placeholder to satisfy evaluation argument
55+
let batch = RecordBatch::try_new(
56+
schema.clone(),
57+
schema
58+
.fields()
59+
.iter()
60+
.map(|field| new_null_array(field.data_type(), 1))
61+
.collect::<Vec<_>>(),
62+
)?;
5563
let arr = (0..n_col)
5664
.map(|j| {
5765
(0..n_row)
5866
.map(|i| {
5967
let r = data[i][j].evaluate(&batch);
6068
match r {
6169
Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
62-
Ok(ColumnarValue::Array(_)) => Err(DataFusionError::Plan(
63-
"Cannot have array values in a values list".into(),
64-
)),
70+
Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
71+
ScalarValue::try_from_array(&a, 0)
72+
}
73+
Ok(ColumnarValue::Array(a)) => {
74+
Err(DataFusionError::Plan(format!(
75+
"Cannot have array values {:?} in a values list",
76+
a
77+
)))
78+
}
6579
Err(err) => Err(err),
6680
}
6781
})

datafusion/src/sql/planner.rs

+99-71
Original file line numberDiff line numberDiff line change
@@ -1069,24 +1069,107 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
10691069
}
10701070
}
10711071

1072+
fn parse_sql_binary_op(
1073+
&self,
1074+
left: &SQLExpr,
1075+
op: &BinaryOperator,
1076+
right: &SQLExpr,
1077+
schema: &DFSchema,
1078+
) -> Result<Expr> {
1079+
let operator = match *op {
1080+
BinaryOperator::Gt => Ok(Operator::Gt),
1081+
BinaryOperator::GtEq => Ok(Operator::GtEq),
1082+
BinaryOperator::Lt => Ok(Operator::Lt),
1083+
BinaryOperator::LtEq => Ok(Operator::LtEq),
1084+
BinaryOperator::Eq => Ok(Operator::Eq),
1085+
BinaryOperator::NotEq => Ok(Operator::NotEq),
1086+
BinaryOperator::Plus => Ok(Operator::Plus),
1087+
BinaryOperator::Minus => Ok(Operator::Minus),
1088+
BinaryOperator::Multiply => Ok(Operator::Multiply),
1089+
BinaryOperator::Divide => Ok(Operator::Divide),
1090+
BinaryOperator::Modulo => Ok(Operator::Modulo),
1091+
BinaryOperator::And => Ok(Operator::And),
1092+
BinaryOperator::Or => Ok(Operator::Or),
1093+
BinaryOperator::Like => Ok(Operator::Like),
1094+
BinaryOperator::NotLike => Ok(Operator::NotLike),
1095+
BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
1096+
BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
1097+
BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
1098+
BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
1099+
_ => Err(DataFusionError::NotImplemented(format!(
1100+
"Unsupported SQL binary operator {:?}",
1101+
op
1102+
))),
1103+
}?;
1104+
1105+
Ok(Expr::BinaryExpr {
1106+
left: Box::new(self.sql_expr_to_logical_expr(left, schema)?),
1107+
op: operator,
1108+
right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
1109+
})
1110+
}
1111+
1112+
fn parse_sql_unary_op(
1113+
&self,
1114+
op: &UnaryOperator,
1115+
expr: &SQLExpr,
1116+
schema: &DFSchema,
1117+
) -> Result<Expr> {
1118+
match op {
1119+
UnaryOperator::Not => Ok(Expr::Not(Box::new(
1120+
self.sql_expr_to_logical_expr(expr, schema)?,
1121+
))),
1122+
UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, schema)?),
1123+
UnaryOperator::Minus => {
1124+
match expr {
1125+
// optimization: if it's a number literal, we apply the negative operator
1126+
// here directly to calculate the new literal.
1127+
SQLExpr::Value(Value::Number(n,_)) => match n.parse::<i64>() {
1128+
Ok(n) => Ok(lit(-n)),
1129+
Err(_) => Ok(lit(-n
1130+
.parse::<f64>()
1131+
.map_err(|_e| {
1132+
DataFusionError::Internal(format!(
1133+
"negative operator can be only applied to integer and float operands, got: {}",
1134+
n))
1135+
})?)),
1136+
},
1137+
// not a literal, apply negative operator on expression
1138+
_ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
1139+
}
1140+
}
1141+
_ => Err(DataFusionError::NotImplemented(format!(
1142+
"Unsupported SQL unary operator {:?}",
1143+
op
1144+
))),
1145+
}
1146+
}
1147+
10721148
fn sql_values_to_plan(&self, values: &SQLValues) -> Result<LogicalPlan> {
1149+
// values should not be based on any other schema
1150+
let schema = DFSchema::empty();
10731151
let values = values
10741152
.0
10751153
.iter()
10761154
.map(|row| {
10771155
row.iter()
10781156
.map(|v| match v {
1079-
SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
1080-
Ok(n) => Ok(lit(n)),
1081-
Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
1082-
},
1157+
SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n),
10831158
SQLExpr::Value(Value::SingleQuotedString(ref s)) => {
10841159
Ok(lit(s.clone()))
10851160
}
10861161
SQLExpr::Value(Value::Null) => {
10871162
Ok(Expr::Literal(ScalarValue::Utf8(None)))
10881163
}
10891164
SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
1165+
SQLExpr::UnaryOp { ref op, ref expr } => {
1166+
self.parse_sql_unary_op(op, expr, &schema)
1167+
}
1168+
SQLExpr::BinaryOp {
1169+
ref left,
1170+
ref op,
1171+
ref right,
1172+
} => self.parse_sql_binary_op(left, op, right, &schema),
10901173
other => Err(DataFusionError::NotImplemented(format!(
10911174
"Unsupported value {:?} in a values list expression",
10921175
other
@@ -1100,14 +1183,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
11001183

11011184
fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> Result<Expr> {
11021185
match sql {
1103-
SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
1104-
Ok(n) => Ok(lit(n)),
1105-
Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
1106-
},
1186+
SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n),
11071187
SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())),
1108-
11091188
SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
1110-
11111189
SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))),
11121190
SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
11131191
fun: functions::BuiltinScalarFunction::DatePart,
@@ -1244,34 +1322,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
12441322
right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
12451323
}),
12461324

1247-
SQLExpr::UnaryOp { ref op, ref expr } => match op {
1248-
UnaryOperator::Not => Ok(Expr::Not(Box::new(
1249-
self.sql_expr_to_logical_expr(expr, schema)?,
1250-
))),
1251-
UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, schema)?),
1252-
UnaryOperator::Minus => {
1253-
match expr.as_ref() {
1254-
// optimization: if it's a number literal, we apply the negative operator
1255-
// here directly to calculate the new literal.
1256-
SQLExpr::Value(Value::Number(n,_)) => match n.parse::<i64>() {
1257-
Ok(n) => Ok(lit(-n)),
1258-
Err(_) => Ok(lit(-n
1259-
.parse::<f64>()
1260-
.map_err(|_e| {
1261-
DataFusionError::Internal(format!(
1262-
"negative operator can be only applied to integer and float operands, got: {}",
1263-
n))
1264-
})?)),
1265-
},
1266-
// not a literal, apply negative operator on expression
1267-
_ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
1268-
}
1269-
}
1270-
_ => Err(DataFusionError::NotImplemented(format!(
1271-
"Unsupported SQL unary operator {:?}",
1272-
op
1273-
))),
1274-
},
1325+
SQLExpr::UnaryOp { ref op, ref expr } => {
1326+
self.parse_sql_unary_op(op, expr, schema)
1327+
}
12751328

12761329
SQLExpr::Between {
12771330
ref expr,
@@ -1306,39 +1359,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
13061359
ref left,
13071360
ref op,
13081361
ref right,
1309-
} => {
1310-
let operator = match *op {
1311-
BinaryOperator::Gt => Ok(Operator::Gt),
1312-
BinaryOperator::GtEq => Ok(Operator::GtEq),
1313-
BinaryOperator::Lt => Ok(Operator::Lt),
1314-
BinaryOperator::LtEq => Ok(Operator::LtEq),
1315-
BinaryOperator::Eq => Ok(Operator::Eq),
1316-
BinaryOperator::NotEq => Ok(Operator::NotEq),
1317-
BinaryOperator::Plus => Ok(Operator::Plus),
1318-
BinaryOperator::Minus => Ok(Operator::Minus),
1319-
BinaryOperator::Multiply => Ok(Operator::Multiply),
1320-
BinaryOperator::Divide => Ok(Operator::Divide),
1321-
BinaryOperator::Modulo => Ok(Operator::Modulo),
1322-
BinaryOperator::And => Ok(Operator::And),
1323-
BinaryOperator::Or => Ok(Operator::Or),
1324-
BinaryOperator::Like => Ok(Operator::Like),
1325-
BinaryOperator::NotLike => Ok(Operator::NotLike),
1326-
BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
1327-
BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
1328-
BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
1329-
BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
1330-
_ => Err(DataFusionError::NotImplemented(format!(
1331-
"Unsupported SQL binary operator {:?}",
1332-
op
1333-
))),
1334-
}?;
1335-
1336-
Ok(Expr::BinaryExpr {
1337-
left: Box::new(self.sql_expr_to_logical_expr(left, schema)?),
1338-
op: operator,
1339-
right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
1340-
})
1341-
}
1362+
} => self.parse_sql_binary_op(left, op, right, schema),
13421363

13431364
SQLExpr::Trim { expr, trim_where } => {
13441365
let (fun, where_expr) = match trim_where {
@@ -3630,3 +3651,10 @@ mod tests {
36303651
quick_test(sql, expected);
36313652
}
36323653
}
3654+
3655+
fn parse_sql_number(n: &str) -> Result<Expr> {
3656+
match n.parse::<i64>() {
3657+
Ok(n) => Ok(lit(n)),
3658+
Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
3659+
}
3660+
}

datafusion/tests/sql.rs

+38
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,30 @@ async fn select_values_list() -> Result<()> {
491491
];
492492
assert_batches_eq!(expected, &actual);
493493
}
494+
{
495+
let sql = "VALUES (-1)";
496+
let actual = execute_to_batches(&mut ctx, sql).await;
497+
let expected = vec![
498+
"+---------+",
499+
"| column1 |",
500+
"+---------+",
501+
"| -1 |",
502+
"+---------+",
503+
];
504+
assert_batches_eq!(expected, &actual);
505+
}
506+
{
507+
let sql = "VALUES (2+1,2-1,2>1)";
508+
let actual = execute_to_batches(&mut ctx, sql).await;
509+
let expected = vec![
510+
"+---------+---------+---------+",
511+
"| column1 | column2 | column3 |",
512+
"+---------+---------+---------+",
513+
"| 3 | 1 | true |",
514+
"+---------+---------+---------+",
515+
];
516+
assert_batches_eq!(expected, &actual);
517+
}
494518
{
495519
let sql = "VALUES";
496520
let plan = ctx.create_logical_plan(sql);
@@ -647,6 +671,20 @@ async fn select_values_list() -> Result<()> {
647671
];
648672
assert_batches_eq!(expected, &actual);
649673
}
674+
{
675+
let sql = "EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5)";
676+
let actual = execute_to_batches(&mut ctx, sql).await;
677+
let expected = vec![
678+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
679+
"| plan_type | plan |",
680+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
681+
"| logical_plan | Values: (Int64(1), Utf8(\"a\"), Int64(-1), Float64(1.1)), (Int64(NULL), Utf8(\"b\"), Int64(-3), Float64(0.5)) |",
682+
"| physical_plan | ValuesExec |",
683+
"| | |",
684+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
685+
];
686+
assert_batches_eq!(expected, &actual);
687+
}
650688
Ok(())
651689
}
652690

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
17+
SELECT * FROM
18+
(VALUES (1,2.0,-3,1+1),(10,20.0,-30,2+2))
19+
AS tbl(int_col, float_col, negative_col, summation);

integration-tests/test_psql_parity.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def generate_csv_from_psql(fname: str):
7777

7878
class TestPsqlParity:
7979
def test_tests_count(self):
80-
assert len(test_files) == 15, "tests are missed"
80+
assert len(test_files) == 16, "tests are missed"
8181

8282
@pytest.mark.parametrize("fname", test_files)
8383
def test_sql_file(self, fname):

0 commit comments

Comments
 (0)