Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove panics in datafusion-common::scalar by making more operations return Result #7901

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue {

impl ToPyArrow for ScalarValue {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let array = self.to_array();
let array = self.to_array()?;
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
Expand Down
598 changes: 351 additions & 247 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion datafusion/core/benches/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("to_array_of_size 100000", |b| {
let scalar = ScalarValue::Int32(Some(100));

b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
b.iter(|| {
assert_eq!(
scalar
.to_array_of_size(100000)
.expect("Failed to convert to array of size")
.null_count(),
0
)
})
});
}

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ async fn prune_partitions(
// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
};

//.Compute the conjunction of the filters, ignoring errors
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl PartitionColumnProjector {
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
),
)?,
)
}

Expand Down Expand Up @@ -396,11 +396,11 @@ fn create_dict_array<T>(
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
) -> ArrayRef
) -> Result<ArrayRef>
where
T: ArrowNativeType,
{
let dict_vals = dict_val.to_array();
let dict_vals = dict_val.to_array()?;

let sliced_key_buffer = buffer_gen.get_buffer(len);

Expand All @@ -409,16 +409,16 @@ where
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
Arc::new(DictionaryArray::<UInt16Type>::from(
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
))
)))
}

fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
) -> ArrayRef {
) -> Result<ArrayRef> {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
match self
.physical_expr
.evaluate(&batch)
.map(|v| v.into_array(batch.num_rows()))
.and_then(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
let bool_arr = as_boolean_array(&array)?.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ macro_rules! get_min_max_values {
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.map(|s| s.to_array())
.and_then(|s| s.to_array().ok())
}}
}

Expand All @@ -425,7 +425,7 @@ macro_rules! get_null_count_values {
},
);

Some(value.to_array())
value.to_array().ok()
}};
}

Expand Down
14 changes: 9 additions & 5 deletions datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_common::{Result, ScalarValue};
use std::sync::Arc;

/// Represents the result of evaluating an expression: either a single
Expand All @@ -47,11 +47,15 @@ impl ColumnarValue {

/// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is
/// converted by repeating the same scalar multiple times.
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size
pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
Ok(match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
})
}

/// null columnar values are implemented as a null array in order to pass batch
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl WindowAggState {
}

pub fn new(out_type: &DataType) -> Result<Self> {
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
Ok(Self {
window_frame_range: Range { start: 0, end: 0 },
window_frame_ctx: None,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/optimizer/src/unwrap_cast_in_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,12 @@ mod tests {
// Verify that calling the arrow
// cast kernel yields the same results
// input array
let literal_array = literal.to_array_of_size(1);
let expected_array = expected_value.to_array_of_size(1);
let literal_array = literal
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let expected_array = expected_value
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let cast_array = cast_with_options(
&literal_array,
&target_type,
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,13 +754,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
10 changes: 8 additions & 2 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,10 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}

let mut first_accumulator =
Expand All @@ -614,7 +617,10 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}

let mut last_accumulator =
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use std::sync::Arc;
pub fn get_accum_scalar_values_as_arrays(
accum: &dyn Accumulator,
) -> Result<Vec<ArrayRef>> {
Ok(accum
accum
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Vec<_>>())
.collect::<Result<Vec<_>>>()
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/conditional_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if value.is_null() {
continue;
} else {
let last_value = value.to_array_of_size(size);
let last_value = value.to_array_of_size(size)?;
current_value =
zip(&remainder, &last_value, current_value.as_ref())?;
break;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {

let array = match array {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let arr = match date_part.to_lowercase().as_str() {
Expand Down
Loading