Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-591 timezone is not considered in Arrow Encode #295

Merged
merged 16 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Block ArrowChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schem
pos++;
}
}
Int8 field_length = getFieldLength(field.second.tp);
Int8 field_length = getFieldLengthForArrowEncode(field.second.tp);
std::vector<UInt64> offsets;
if (field_length == VAR_SIZE)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
Expand Down Expand Up @@ -66,8 +67,8 @@ try

BlockOutputStreamPtr dag_output_stream = std::make_shared<DAGBlockOutputStream>(dag_response,
context.getSettings().dag_records_per_chunk,
dag_request.encode_type(),
dag.getResultFieldTypes(),
dag.getEncodeType(),
dag.getResultFieldTypes(dag.getDAGContext().void_result_ft),
streams.in->getHeader());
copyData(*streams.in, *dag_output_stream);

Expand Down
26 changes: 11 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,15 @@ String DAGExpressionAnalyzer::appendTimeZoneCast(

// add timezone cast after table scan, this is used for session level timezone support
// the basic idea of supporting session level timezone is that:
// 1. for every timestamp column used in the dag request, after reading it from table scan, we add
// cast function to convert its timezone to the timezone specified in DAG request
// 2. for every timestamp column that will be returned to TiDB, we add cast function to convert its
// timezone to UTC
// for timestamp columns without any transformation or calculation(e.g. select ts_col from table),
// this will introduce two useless casts, in order to avoid these redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC
// timezone are still kept
// for DAG request that does not contain agg, the final project will select the ts column with UTC
// timezone, which is exactly what TiDB want
// for DAG request that contains agg, any ts column after agg has session-level timezone info(since the ts
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
// 1. for every timestamp column used in the dag request, after reading it from table scan,
// we add cast function to convert its timezone to the timezone specified in DAG request
// 2. based on the dag encode type, the return column will be with session level timezone(Arrow encode)
// or UTC timezone(Default encode), if UTC timezone is needed, another cast function is used to
// convert the session level timezone to UTC timezone.
// In the worst case(e.g select ts_col from table with Default encode), this will introduce two
// useless casts to all the timestamp columns, in order to avoid redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC timezone
// are still kept, and the InterpreterDAG will choose the correct column based on encode type
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst)
{
Expand Down Expand Up @@ -311,13 +307,13 @@ bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
}

void DAGExpressionAnalyzer::appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst)
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst, bool keep_session_timezone_info)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
std::vector<NameAndTypePair> updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
bool need_append_timezone_cast = hasMeaningfulTZInfo(rqst);
bool need_append_timezone_cast = !keep_session_timezone_info && hasMeaningfulTZInfo(rqst);
tipb::Expr tz_expr;
if (need_append_timezone_cast)
constructTZExpr(tz_expr, rqst, false);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst);
void appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst, bool keep_session_timezone_info);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const std::vector<NameAndTypePair> & columns) const
{
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
assignOrThrowException(limit_index, i, LIMIT_NAME);
break;
default:
throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
}
}
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes({})))
{
encode_type = tipb::EncodeType::TypeDefault;
}
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t max_query_size)
Expand Down Expand Up @@ -127,15 +133,15 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
}
}

std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes(const tipb::FieldType & void_result_ft) const
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
{
if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output))
{
if (executor_output.empty())
executor_output.push_back(dag_context.void_result_ft);
executor_output.push_back(void_result_ft);
break;
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ class DAGQuerySource : public IQuerySource
};
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

std::vector<tipb::FieldType> getResultFieldTypes() const;
std::vector<tipb::FieldType> getResultFieldTypes(const tipb::FieldType & void_result_ft) const;

ASTPtr getAST() const { return ast; };

tipb::EncodeType getEncodeType() const { return encode_type; }

protected:
void assertValid(Int32 index, const String & name) const
{
Expand All @@ -110,6 +112,8 @@ class DAGQuerySource : public IQuerySource
Int32 order_index = -1;
Int32 limit_index = -1;

tipb::EncodeType encode_type;

ASTPtr ast;
};

Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

const Int8 VAR_SIZE = 0;

bool isFunctionExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType::ScalarFunc || isAggFunctionExpr(expr); }

const String & getAggFunctionName(const tipb::Expr & expr)
Expand Down Expand Up @@ -262,6 +264,49 @@ bool exprHasValidFieldType(const tipb::Expr & expr)
return expr.has_field_type() && !(expr.field_type().tp() == TiDB::TP::TypeNewDecimal && expr.field_type().decimal() == -1);
}

bool hasUnsupportedTypeForArrowEncode(const std::vector<tipb::FieldType> & types)
{
for (const auto & type : types)
if (type.tp() == TiDB::TypeSet || type.tp() == TiDB::TypeTime || type.tp() == TiDB::TypeEnum || type.tp() == TiDB::TypeBit)
return true;
return false;
}

UInt8 getFieldLengthForArrowEncode(Int32 tp)
{
switch (tp)
{
case TiDB::TypeTiny:
case TiDB::TypeShort:
case TiDB::TypeInt24:
case TiDB::TypeLong:
case TiDB::TypeLongLong:
case TiDB::TypeYear:
case TiDB::TypeDouble:
return 8;
case TiDB::TypeFloat:
return 4;
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return 40;
case TiDB::TypeDate:
case TiDB::TypeDatetime:
case TiDB::TypeNewDate:
case TiDB::TypeTimestamp:
return 20;
case TiDB::TypeVarchar:
case TiDB::TypeVarString:
case TiDB::TypeString:
case TiDB::TypeBlob:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
case TiDB::TypeLongBlob:
return VAR_SIZE;
default:
throw Exception("not supported field type in arrow encode: " + std::to_string(tp));
}
}

void constructStringLiteralTiExpr(tipb::Expr & expr, const String & value)
{
expr.set_tp(tipb::ExprType::String);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ void constructInt64LiteralTiExpr(tipb::Expr & expr, Int64 value);
void constructDateTimeLiteralTiExpr(tipb::Expr & expr, UInt64 packed_value);
extern std::unordered_map<tipb::ExprType, String> agg_func_map;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map;
extern const Int8 VAR_SIZE;

tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci);
TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type);
bool hasUnsupportedTypeForArrowEncode(const std::vector<tipb::FieldType> & types);
UInt8 getFieldLengthForArrowEncode(Int32 tp);

} // namespace DB
31 changes: 26 additions & 5 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
: context(context_),
dag(dag_),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeArrow),
log(&Logger::get("InterpreterDAG"))
{}

template <typename HandleType>
Expand Down Expand Up @@ -308,21 +311,39 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
});
}

addTimeZoneCastAfterTS(is_ts_column, pipeline);
if (addTimeZoneCastAfterTS(is_ts_column, pipeline))
{
// for arrow encode, the final select of timestamp column should be column with session timezone
if (keep_session_timezone_info && !dag.hasAggregation())
{
for (auto i : dag.getDAGRequest().output_offsets())
{
if (is_ts_column[i])
{
final_project[i].first = analyzer->getCurrentInputColumns()[i].name;
}
}
}
}
}

// add timezone cast for timestamp type, this is used to support session level timezone
void InterpreterDAG::addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline)
bool InterpreterDAG::addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline)
{
bool hasTSColumn = false;
for (auto b : is_ts_column)
hasTSColumn |= b;
if (!hasTSColumn)
return;
return false;

ExpressionActionsChain chain;
if (analyzer->appendTimeZoneCastsAfterTS(chain, is_ts_column, dag.getDAGRequest()))
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions()); });
return true;
}
else
return false;
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand All @@ -347,7 +368,7 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
chain.clear();

// add cast if type is not match
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest());
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest(), keep_session_timezone_info);
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer->getCurrentInputColumns())
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class InterpreterDAG : public IInterpreter
SortDescription getSortDescription(Strings & order_column_names);
AnalysisResult analyzeExpressions();
void recordProfileStreams(Pipeline & pipeline, Int32 index);
void addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);
bool addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);

private:
Context & context;
Expand All @@ -101,6 +101,8 @@ class InterpreterDAG : public IInterpreter

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

const bool keep_session_timezone_info;

Poco::Logger * log;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/TiDBChunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TiDBChunk::TiDBChunk(const std::vector<tipb::FieldType> & field_types)
{
for (auto & type : field_types)
{
columns.emplace_back(getFieldLength(type.tp()));
columns.emplace_back(getFieldLengthForArrowEncode(type.tp()));
}
}

Expand Down
37 changes: 1 addition & 36 deletions dbms/src/Flash/Coprocessor/TiDBColumn.h
Original file line number Diff line number Diff line change
@@ -1,49 +1,14 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/TiDBDecimal.h>
#include <Flash/Coprocessor/TiDBTime.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{

const Int8 VAR_SIZE = 0;
inline UInt8 getFieldLength(Int32 tp)
{
switch (tp)
{
case TiDB::TypeTiny:
case TiDB::TypeShort:
case TiDB::TypeInt24:
case TiDB::TypeLong:
case TiDB::TypeLongLong:
case TiDB::TypeYear:
case TiDB::TypeDouble:
return 8;
case TiDB::TypeFloat:
return 4;
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return 40;
case TiDB::TypeDate:
case TiDB::TypeDatetime:
case TiDB::TypeNewDate:
case TiDB::TypeTimestamp:
return 20;
case TiDB::TypeVarchar:
case TiDB::TypeVarString:
case TiDB::TypeString:
case TiDB::TypeBlob:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
case TiDB::TypeLongBlob:
return VAR_SIZE;
default:
throw Exception("not supported field type in arrow encode: " + std::to_string(tp));
}
}

class TiDBColumn
{
public:
Expand Down
17 changes: 17 additions & 0 deletions tests/mutable-test/txn_dag/time_zone.test
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test arrow encode
=> DBGInvoke dag('select * from default.test',4,'arrow',28800) " --dag_planner="optree
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 17:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 15:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

=> DBGInvoke dag('select * from default.test where col_2 > col_3') " --dag_planner="optree

=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'default',28800) " --dag_planner="optree
Expand Down Expand Up @@ -70,6 +79,14 @@
│ 2019-06-10 09:00:00 │ 2019-06-10 │
└─────────────────────┴────────────┘

# ts_col in agg clause for arrow encode
=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,'arrow',28800) " --dag_planner="optree
┌──────────max(col_2)─┬──────col_1─┐
│ 2019-06-11 16:00:00 │ 2019-06-12 │
│ 2019-06-11 16:00:00 │ 2019-06-11 │
│ 2019-06-10 17:00:00 │ 2019-06-10 │
└─────────────────────┴────────────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test