Skip to content

Commit 32b1456

Browse files
authored
[feature-wip](array) remove array config and check array nested depth (#13428)
1. remove FE config `enable_array_type` 2. limit the nested depth of array in FE side. 3. Fix bug that when loading array from parquet, the decimal type is treated as bigint 4. Fix loading array from csv(vec-engine), handle null and "null" 5. Change the csv array loading behavior, if the array string format is invalid in csv, it will be converted to null. 6. Remove `check_array_format()`, because it's logic is wrong and meaningless 7. Add stream load csv test cases and more parquet broker load tests
1 parent 1892e8f commit 32b1456

File tree

55 files changed

+4455
-200
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+4455
-200
lines changed

be/src/runtime/decimalv2_value.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ int DecimalV2Value::parse_from_str(const char* decimal_str, int32_t length) {
354354

355355
_value = StringParser::string_to_decimal<__int128>(decimal_str, length, PRECISION, SCALE,
356356
&result);
357-
358357
if (result == StringParser::PARSE_FAILURE) {
359358
error = E_DEC_BAD_NUM;
360359
}

be/src/vec/data_types/data_type_array.cpp

+27-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "vec/data_types/data_type_array.h"
2222

2323
#include "gen_cpp/data.pb.h"
24+
#include "util/stack_util.h"
2425
#include "vec/columns/column_array.h"
2526
#include "vec/columns/column_nullable.h"
2627
#include "vec/data_types/data_type_nullable.h"
@@ -175,15 +176,21 @@ std::string DataTypeArray::to_string(const IColumn& column, size_t row_num) cons
175176
}
176177

177178
Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const {
179+
DCHECK(!rb.eof());
178180
// only support one level now
179181
auto* array_column = assert_cast<ColumnArray*>(column);
180182
auto& offsets = array_column->get_offsets();
181183

182184
IColumn& nested_column = array_column->get_data();
185+
DCHECK(nested_column.is_nullable());
183186
if (*rb.position() != '[') {
184187
return Status::InvalidArgument("Array does not start with '[' character, found '{}'",
185188
*rb.position());
186189
}
190+
if (*(rb.end() - 1) != ']') {
191+
return Status::InvalidArgument("Array does not end with ']' character, found '{}'",
192+
*(rb.end() - 1));
193+
}
187194
++rb.position();
188195
bool first = true;
189196
size_t size = 0;
@@ -210,13 +217,9 @@ Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const {
210217

211218
// dispose the case of [123,,,]
212219
if (nested_str_len == 0) {
213-
if (nested_column.is_nullable()) {
214-
auto& nested_null_col = reinterpret_cast<ColumnNullable&>(nested_column);
215-
nested_null_col.get_nested_column().insert_default();
216-
nested_null_col.get_null_map_data().push_back(0);
217-
} else {
218-
nested_column.insert_default();
219-
}
220+
auto& nested_null_col = reinterpret_cast<ColumnNullable&>(nested_column);
221+
nested_null_col.get_nested_column().insert_default();
222+
nested_null_col.get_null_map_data().push_back(0);
220223
++size;
221224
continue;
222225
}
@@ -236,19 +239,31 @@ Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const {
236239
}
237240

238241
// dispose the case of ["123"] or ['123']
242+
bool has_quota = false;
243+
size_t tmp_len = nested_str_len;
239244
ReadBuffer read_buffer(rb.position(), nested_str_len);
240245
auto begin_char = *(rb.position() + begin_pos);
241246
auto end_char = *(rb.position() + end_pos);
242247
if (begin_char == end_char && (begin_char == '"' || begin_char == '\'')) {
243248
int64_t length = end_pos - begin_pos - 1;
244249
read_buffer = ReadBuffer(rb.position() + begin_pos + 1, (length > 0 ? length : 0));
250+
tmp_len = (length > 0 ? length : 0);
251+
has_quota = true;
245252
}
246253

247-
auto st = nested->from_string(read_buffer, &nested_column);
248-
if (!st.ok()) {
249-
// we should do revert if error
250-
array_column->pop_back(size);
251-
return st;
254+
// handle null, need to distinguish null and "null"
255+
if (!has_quota && tmp_len == 4 && strncmp(read_buffer.position(), "null", 4) == 0) {
256+
// insert null
257+
auto& nested_null_col = reinterpret_cast<ColumnNullable&>(nested_column);
258+
nested_null_col.get_nested_column().insert_default();
259+
nested_null_col.get_null_map_data().push_back(1);
260+
} else {
261+
auto st = nested->from_string(read_buffer, &nested_column);
262+
if (!st.ok()) {
263+
// we should do revert if error
264+
array_column->pop_back(size);
265+
return st;
266+
}
252267
}
253268
rb.position() += nested_str_len;
254269
DCHECK_LE(rb.position(), rb.end());

be/src/vec/exec/format/csv/csv_reader.cpp

-6
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,6 @@ Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColum
237237
return Status::OK();
238238
}
239239

240-
RETURN_IF_ERROR(_check_array_format(_split_values, &is_success));
241-
if (UNLIKELY(!is_success)) {
242-
// If not success, which means we met an invalid row, filter this row and return.
243-
return Status::OK();
244-
}
245-
246240
// if _split_values.size > _file_slot_descs.size()
247241
// we only take the first few columns
248242
for (int i = 0; i < _file_slot_descs.size(); ++i) {

be/src/vec/exec/format/parquet/schema_desc.cpp

+26-21
Original file line numberDiff line numberDiff line change
@@ -157,31 +157,35 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
157157

158158
TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& physical_schema) {
159159
TypeDescriptor type;
160-
switch (physical_schema.type) {
161-
case tparquet::Type::BOOLEAN:
162-
type.type = TYPE_BOOLEAN;
163-
return type;
164-
case tparquet::Type::INT32:
165-
type.type = TYPE_INT;
166-
return type;
167-
case tparquet::Type::INT64:
168-
case tparquet::Type::INT96:
169-
type.type = TYPE_BIGINT;
170-
return type;
171-
case tparquet::Type::FLOAT:
172-
type.type = TYPE_FLOAT;
173-
return type;
174-
case tparquet::Type::DOUBLE:
175-
type.type = TYPE_DOUBLE;
176-
return type;
177-
default:
178-
break;
179-
}
160+
type.type = INVALID_TYPE;
180161
if (physical_schema.__isset.logicalType) {
181162
type = convert_to_doris_type(physical_schema.logicalType);
182163
} else if (physical_schema.__isset.converted_type) {
183164
type = convert_to_doris_type(physical_schema.converted_type);
184165
}
166+
// use physical type instead
167+
if (type.type == INVALID_TYPE) {
168+
switch (physical_schema.type) {
169+
case tparquet::Type::BOOLEAN:
170+
type.type = TYPE_BOOLEAN;
171+
return type;
172+
case tparquet::Type::INT32:
173+
type.type = TYPE_INT;
174+
return type;
175+
case tparquet::Type::INT64:
176+
case tparquet::Type::INT96:
177+
type.type = TYPE_BIGINT;
178+
return type;
179+
case tparquet::Type::FLOAT:
180+
type.type = TYPE_FLOAT;
181+
return type;
182+
case tparquet::Type::DOUBLE:
183+
type.type = TYPE_DOUBLE;
184+
return type;
185+
default:
186+
break;
187+
}
188+
}
185189
return type;
186190
}
187191

@@ -214,7 +218,7 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
214218
} else if (logicalType.__isset.TIMESTAMP) {
215219
type.type = TYPE_DATETIMEV2;
216220
} else {
217-
LOG(WARNING) << "Not supported parquet LogicalType";
221+
type.type = INVALID_TYPE;
218222
}
219223
return type;
220224
}
@@ -253,6 +257,7 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::ConvertedType::t
253257
break;
254258
default:
255259
LOG(WARNING) << "Not supported parquet ConvertedType: " << convertedType;
260+
type = INVALID_TYPE;
256261
break;
257262
}
258263
return type;

be/src/vec/exec/scan/vfile_scanner.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ Status VFileScanner::_get_next_reader() {
509509
_name_to_col_type.clear();
510510
_missing_cols.clear();
511511
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
512-
if (!_missing_cols.empty() && _is_load && VLOG_NOTICE_IS_ON) {
512+
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
513513
fmt::memory_buffer col_buf;
514514
for (auto& col : _missing_cols) {
515515
fmt::format_to(col_buf, " {}", col);

be/src/vec/exec/scan/vscanner.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,6 @@ Status VScanner::close(RuntimeState* state) {
120120
}
121121

122122
void VScanner::_update_counters_before_close() {
123-
LOG(INFO) << "cmy _update_counters_before_close: _counter.num_rows_filtered: "
124-
<< _counter.num_rows_filtered
125-
<< ", _counter.num_rows_unselected: " << _counter.num_rows_unselected;
126123
if (!_state->enable_profile() && !_is_load) return;
127124
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
128125
// Update stats for load

be/src/vec/exec/vbroker_scanner.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
9191
return Status::OK();
9292
}
9393

94-
if (!check_array_format(_split_values)) {
95-
return Status::OK();
96-
}
94+
// This check is meaningless, should be removed
95+
// if (!check_array_format(_split_values)) {
96+
// return Status::OK();
97+
// }
9798

9899
int idx = 0;
99100
for (int i = 0; i < _split_values.size(); ++i) {

be/src/vec/functions/function_cast.h

+13-4
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,12 @@ struct ConvertImplGenericFromString {
320320
check_and_get_column<StringColumnType>(&col_from)) {
321321
auto col_to = data_type_to->create_column();
322322

323-
//IColumn & col_to = *res;
324323
size_t size = col_from.size();
325324
col_to->reserve(size);
326325

326+
ColumnUInt8::MutablePtr col_null_map_to = ColumnUInt8::create(size);
327+
ColumnUInt8::Container* vec_null_map_to = &col_null_map_to->get_data();
328+
327329
for (size_t i = 0; i < size; ++i) {
328330
const auto& val = col_from_string->get_data_at(i);
329331
// Note: here we should handle the null element
@@ -332,9 +334,15 @@ struct ConvertImplGenericFromString {
332334
continue;
333335
}
334336
ReadBuffer read_buffer((char*)(val.data), val.size);
335-
RETURN_IF_ERROR(data_type_to->from_string(read_buffer, col_to));
337+
Status st = data_type_to->from_string(read_buffer, col_to);
338+
// if parsing failed, will return null
339+
(*vec_null_map_to)[i] = !st.ok();
340+
if (!st.ok()) {
341+
col_to->insert_default();
342+
}
336343
}
337-
block.replace_by_position(result, std::move(col_to));
344+
block.get_by_position(result).column =
345+
ColumnNullable::create(std::move(col_to), std::move(col_null_map_to));
338346
} else {
339347
return Status::RuntimeError(
340348
"Illegal column {} of first argument of conversion function from string",
@@ -870,8 +878,9 @@ struct ConvertThroughParsing {
870878
if constexpr (IsDataTypeDecimal<ToDataType>) {
871879
UInt32 scale = additions;
872880
col_to = ColVecTo::create(size, scale);
873-
} else
881+
} else {
874882
col_to = ColVecTo::create(size);
883+
}
875884

876885
typename ColVecTo::Container& vec_to = col_to->get_data();
877886

be/src/vec/io/reader_buffer.h

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class ReadBuffer {
3535

3636
size_t count() { return _end - _start; }
3737

38+
std::string to_string() { return std::string(_start, (_end - _start)); }
39+
3840
private:
3941
char* _start;
4042
char* _end;

docs/zh-CN/docs/sql-manual/sql-reference/Data-Types/ARRAY.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ under the License.
2828

2929
### description
3030

31-
ARRAY\<T\>
31+
`ARRAY<T>`
3232

3333
由T类型元素组成的数组,不能作为key列使用。目前支持在Duplicate模型的表中使用。
3434

@@ -74,7 +74,7 @@ PROPERTIES (
7474
mysql> INSERT INTO `array_test` VALUES (1, [1,2,3,4,5]);
7575
mysql> INSERT INTO `array_test` VALUES (2, array(6,7,8)), (3, array()), (4, null);
7676
```
77-
注意:以上sql仅在非向量化场景下,支持array()函数,向量化场景不支持。
77+
注意:以上sql仅在非向量化场景下,支持 array() 函数,向量化场景不支持。
7878

7979
查询数据示例:
8080

fe/check/checkstyle/checkstyle.xml

-4
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ under the License.
5757
<property name="lineSeparator" value="lf"/>
5858
</module>
5959

60-
<module name="RegexpSingleline">
61-
<property name="format" value="&gt;&gt;&gt;&gt;&gt;&gt;&gt;"/>
62-
<property name="message" value="Merge conflicts unresolved."/>
63-
</module>
6460
<module name="RegexpSingleline">
6561
<property name="format" value="&lt;&lt;&lt;&lt;&lt;&lt;&lt;"/>
6662
<property name="message" value="Merge conflicts unresolved."/>

fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public void analyze(boolean isOlap) throws AnalysisException {
267267
if (type.getPrimitiveType() == PrimitiveType.ARRAY) {
268268
if (isKey()) {
269269
throw new AnalysisException("Array can only be used in the non-key column of"
270-
+ " the duplicate table at present.");
270+
+ " the duplicate table at present.");
271271
}
272272
if (defaultValue.isSet && defaultValue != DefaultValue.NULL_DEFAULT_VALUE) {
273273
throw new AnalysisException("Array type column default value only support null");

fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ public void analyze(Analyzer analyzer) throws UserException {
328328
if (columnDef.getType().getPrimitiveType() == PrimitiveType.JSONB) {
329329
break;
330330
}
331+
if (columnDef.getType().isCollectionType()) {
332+
break;
333+
}
331334
if (columnDef.getType().getPrimitiveType() == PrimitiveType.VARCHAR) {
332335
keysColumnNames.add(columnDef.getName());
333336
break;
@@ -393,9 +396,6 @@ public void analyze(Analyzer analyzer) throws UserException {
393396
columnDef.analyze(engineName.equals("olap"));
394397

395398
if (columnDef.getType().isArrayType()) {
396-
if (!Config.enable_array_type) {
397-
throw new AnalysisException("Please open enable_array_type config before use Array.");
398-
}
399399
if (columnDef.getAggregateType() != null && columnDef.getAggregateType() != AggregateType.NONE) {
400400
throw new AnalysisException("Array column can't support aggregation "
401401
+ columnDef.getAggregateType());

fe/fe-core/src/main/java/org/apache/doris/catalog/ArrayType.java

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
*/
3434
public class ArrayType extends Type {
3535

36+
public static final int MAX_NESTED_DEPTH = 9;
37+
3638
@SerializedName(value = "itemType")
3739
private Type itemType;
3840

fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,8 @@
4646
* as abstract methods that subclasses must implement.
4747
*/
4848
public abstract class Type {
49-
// Maximum nesting depth of a type. This limit was determined experimentally by
50-
// org.apache.doris.rewrite.FoldConstantsRule.apply generating and scanning
51-
// deeply nested Parquet and Avro files. In those experiments, we exceeded
52-
// the stack space in the scanner (which uses recursion for dealing with
53-
// nested types) at a nesting depth between 200 and 300 (200 worked, 300 crashed).
54-
public static int MAX_NESTING_DEPTH = 2;
49+
// Currently only support Array type with max 9 depths.
50+
public static int MAX_NESTING_DEPTH = 9;
5551

5652
// Static constant types for scalar types that don't require additional information.
5753
public static final ScalarType INVALID = new ScalarType(PrimitiveType.INVALID_TYPE);
@@ -488,7 +484,7 @@ public static boolean canCastTo(Type t1, Type t2) {
488484
} else if (t1.isArrayType() && t2.isArrayType()) {
489485
return ArrayType.canCastTo((ArrayType) t1, (ArrayType) t2);
490486
}
491-
return t1.isNull() || t1.getPrimitiveType() == PrimitiveType.VARCHAR;
487+
return t1.isNull() || t1.getPrimitiveType().isCharFamily();
492488
}
493489

494490
/**
@@ -612,7 +608,7 @@ public boolean exceedsMaxNestingDepth() {
612608
* MAP<STRING,STRUCT<f1:INT>> --> 3
613609
*/
614610
private boolean exceedsMaxNestingDepth(int d) {
615-
if (d >= MAX_NESTING_DEPTH) {
611+
if (d > MAX_NESTING_DEPTH) {
616612
return true;
617613
}
618614
if (isStructType()) {
@@ -623,7 +619,9 @@ private boolean exceedsMaxNestingDepth(int d) {
623619
}
624620
}
625621
} else if (isArrayType()) {
626-
return false;
622+
ArrayType arrayType = (ArrayType) this;
623+
Type itemType = arrayType.getItemType();
624+
return itemType.exceedsMaxNestingDepth(d + 1);
627625
} else if (isMultiRowType()) {
628626
MultiRowType multiRowType = (MultiRowType) this;
629627
return multiRowType.getItemType().exceedsMaxNestingDepth(d + 1);

fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnDefTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.doris.catalog.ScalarType;
2626
import org.apache.doris.catalog.Type;
2727
import org.apache.doris.common.AnalysisException;
28-
import org.apache.doris.common.Config;
2928
import org.apache.doris.qe.ConnectContext;
3029

3130
import mockit.Mock;
@@ -47,7 +46,6 @@ public void setUp() {
4746
stringCol = new TypeDef(ScalarType.createChar(10));
4847
floatCol = new TypeDef(ScalarType.createType(PrimitiveType.FLOAT));
4948
booleanCol = new TypeDef(ScalarType.createType(PrimitiveType.BOOLEAN));
50-
Config.enable_array_type = true;
5149

5250
ctx = new ConnectContext(null);
5351
new MockUp<ConnectContext>() {

0 commit comments

Comments
 (0)