diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 2c641858e76..3dcd5e905b1 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -100,6 +100,10 @@ std::unordered_map> FailPointHelper::f M(pause_when_altering_dt_store) \ M(pause_after_copr_streams_acquired) +#define APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) \ + M(random_tunnel_failpoint) \ + M(random_receiver_failpoint) + namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; @@ -107,6 +111,7 @@ APPLY_FOR_FAILPOINTS_ONCE(M) APPLY_FOR_FAILPOINTS(M) APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) +APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) #undef M } // namespace FailPoints diff --git a/dbms/src/Common/MyTime.cpp b/dbms/src/Common/MyTime.cpp index 1b957e65484..8394a594b0c 100644 --- a/dbms/src/Common/MyTime.cpp +++ b/dbms/src/Common/MyTime.cpp @@ -992,6 +992,15 @@ int calcDayNum(int year, int month, int day) return delsum + year / 4 - temp; } +UInt64 calcSeconds(int year, int month, int day, int hour, int minute, int second) +{ + if (year == 0 && month == 0) + return 0; + Int32 current_days = calcDayNum(year, month, day); + return current_days * MyTimeBase::SECOND_IN_ONE_DAY + hour * MyTimeBase::SECOND_IN_ONE_HOUR + + minute * MyTimeBase::SECOND_IN_ONE_MINUTE + second; +} + size_t maxFormattedDateTimeStringLength(const String & format) { size_t result = 0; @@ -1142,7 +1151,7 @@ UInt64 addSeconds(UInt64 t, Int64 delta) return t; } MyDateTime my_time(t); - Int64 current_second = my_time.hour * 3600 + my_time.minute * 60 + my_time.second; + Int64 current_second = my_time.hour * MyTimeBase::SECOND_IN_ONE_HOUR + my_time.minute * MyTimeBase::SECOND_IN_ONE_MINUTE + my_time.second; current_second += delta; if (current_second >= 0) { @@ -1161,9 +1170,9 @@ UInt64 addSeconds(UInt64 t, Int64 delta) current_second += days * MyTimeBase::SECOND_IN_ONE_DAY; addDays(my_time, -days); } - my_time.hour = current_second / 3600; - my_time.minute = (current_second % 3600) / 60; - my_time.second = current_second % 60; + my_time.hour = current_second / MyTimeBase::SECOND_IN_ONE_HOUR; + my_time.minute = (current_second % MyTimeBase::SECOND_IN_ONE_HOUR) / MyTimeBase::SECOND_IN_ONE_MINUTE; + my_time.second = current_second % MyTimeBase::SECOND_IN_ONE_MINUTE; return my_time.toPackedUInt(); } diff --git a/dbms/src/Common/MyTime.h b/dbms/src/Common/MyTime.h index c146ad900f0..ecbaed60445 100644 --- a/dbms/src/Common/MyTime.h +++ b/dbms/src/Common/MyTime.h @@ -24,6 +24,9 @@ namespace DB struct MyTimeBase { static constexpr Int64 SECOND_IN_ONE_DAY = 86400; + static constexpr Int64 SECOND_IN_ONE_HOUR = 3600; + static constexpr Int64 SECOND_IN_ONE_MINUTE = 60; + // copied from https://github.com/pingcap/tidb/blob/master/types/time.go // Core time bit fields. @@ -193,6 +196,9 @@ std::pair roundTimeByFsp(time_t second, UInt64 nano_second, UInt int calcDayNum(int year, int month, int day); +// returns seconds since '0000-00-00' +UInt64 calcSeconds(int year, int month, int day, int hour, int minute, int second); + size_t maxFormattedDateTimeStringLength(const String & format); inline time_t getEpochSecond(const MyDateTime & my_time, const DateLUTImpl & time_zone) diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index a53216ff1bb..3be0ac4f55e 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -573,7 +573,7 @@ const std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::TimeToSec, "cast"}, //{tipb::ScalarFuncSig::TimestampAdd, "cast"}, //{tipb::ScalarFuncSig::ToDays, "cast"}, - //{tipb::ScalarFuncSig::ToSeconds, "cast"}, + {tipb::ScalarFuncSig::ToSeconds, "tidbToSeconds"}, //{tipb::ScalarFuncSig::UTCTimeWithArg, "cast"}, //{tipb::ScalarFuncSig::UTCTimestampWithoutArg, "cast"}, //{tipb::ScalarFuncSig::Timestamp1Arg, "cast"}, diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 8af81e30962..e27882057c1 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -13,12 +13,18 @@ // limitations under the License. #include +#include #include #include #include namespace DB { +namespace FailPoints +{ +extern const char random_tunnel_failpoint[]; +} // namespace FailPoints + EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) : service(service) , cq(cq) @@ -60,6 +66,7 @@ void EstablishCallData::tryFlushOne() void EstablishCallData::responderFinish(const grpc::Status & status) { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); if (*is_shutdown) finishTunnelAndResponder(); else @@ -71,6 +78,7 @@ void EstablishCallData::initRpc() std::exception_ptr eptr = nullptr; try { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this); } catch (...) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 9639771c586..c39f00a97a2 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -15,13 +15,24 @@ #include #include #include +#include #include #include #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { +namespace FailPoints +{ +extern const char random_receiver_failpoint[]; +} // namespace FailPoints + namespace { String getReceiverStateStr(const ExchangeReceiverState & s) @@ -470,13 +481,30 @@ void ExchangeReceiverBase::readLoop(const Request & req) recv_msg->req_info = req_info; recv_msg->source_index = req.source_index; bool success = reader->read(recv_msg->packet); + fiu_do_on(FailPoints::random_receiver_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 11) + success = false; + }); if (!success) break; has_data = true; if (recv_msg->packet->has_error()) throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg()); - if (!msg_channel.push(std::move(recv_msg))) + bool push_success = msg_channel.push(std::move(recv_msg)); + fiu_do_on(FailPoints::random_receiver_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 71) + push_success = false; + }); + if (!push_success) { meet_error = true; auto local_state = getState(); diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index b0e49792d1a..af65305fa42 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -50,6 +51,11 @@ struct RpcTypeTraits<::mpp::EstablishMPPConnectionRequest> namespace DB { +namespace FailPoints +{ +extern const char random_receiver_failpoint[]; +} // namespace FailPoints + namespace { struct GrpcExchangePacketReader : public ExchangePacketReader @@ -218,7 +224,9 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques if (request.is_local) { auto [tunnel, status] = establishMPPConnectionLocal(request.req.get(), task_manager); - if (!status.ok()) + bool status_ok = status.ok(); + fiu_do_on(FailPoints::random_receiver_failpoint, status_ok = false;); + if (!status_ok) { throw Exception("Exchange receiver meet error : " + status.error_message()); } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 826e7fea88a..c922f406dbe 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -16,15 +16,22 @@ #include #include #include +#include #include #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { namespace FailPoints { extern const char exception_during_mpp_close_tunnel[]; +extern const char random_tunnel_failpoint[]; } // namespace FailPoints template @@ -194,7 +201,16 @@ void MPPTunnelBase::sendJob(bool need_lock) MPPDataPacketPtr res; while (send_queue.pop(res)) { - if (!writer->write(*res)) + bool write_success = writer->write(*res); + fiu_do_on(FailPoints::random_tunnel_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 17) + write_success = false; + }); + if (!write_success) { err_msg = "grpc writes failed."; break; @@ -322,6 +338,7 @@ void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lock(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); diff --git a/dbms/src/Functions/FunctionsDateTime.h b/dbms/src/Functions/FunctionsDateTime.h index 3d15186b472..9ae4eeaaad9 100644 --- a/dbms/src/Functions/FunctionsDateTime.h +++ b/dbms/src/Functions/FunctionsDateTime.h @@ -3277,6 +3277,42 @@ struct TiDBWeekOfYearTransformerImpl } }; +template +struct TiDBToSecondsTransformerImpl +{ + static constexpr auto name = "tidbToSeconds"; + + static void execute(const Context & context, + const ColumnVector::Container & vec_from, + typename ColumnVector::Container & vec_to, + typename ColumnVector::Container & vec_null_map) + { + bool is_null = false; + for (size_t i = 0; i < vec_from.size(); ++i) + { + MyTimeBase val(vec_from[i]); + vec_to[i] = execute(context, val, is_null); + vec_null_map[i] = is_null; + is_null = false; + } + } + + static ToFieldType execute(const Context & context, const MyTimeBase & val, bool & is_null) + { + // TiDB returns normal value if one of month/day is zero for to_seconds function, while MySQL return null if either of them is zero. + // TiFlash aligns with MySQL to align the behavior with other functions like last_day. + if (val.month == 0 || val.day == 0) + { + context.getDAGContext()->handleInvalidTime( + fmt::format("Invalid time value: month({}) or day({}) is zero", val.month, val.day), + Errors::Types::WrongValue); + is_null = true; + return 0; + } + return static_cast(calcSeconds(val.year, val.month, val.day, val.hour, val.minute, val.second)); + } +}; + // Similar to FunctionDateOrDateTimeToSomething, but also handle nullable result and mysql sql mode. template class Transformer, bool return_nullable> class FunctionMyDateOrMyDateTimeToSomething : public IFunction @@ -3376,6 +3412,7 @@ using FunctionToLastDay = FunctionMyDateOrMyDateTimeToSomething; using FunctionToTiDBDayOfYear = FunctionMyDateOrMyDateTimeToSomething; using FunctionToTiDBWeekOfYear = FunctionMyDateOrMyDateTimeToSomething; +using FunctionToTiDBToSeconds = FunctionMyDateOrMyDateTimeToSomething; using FunctionToRelativeYearNum = FunctionDateOrDateTimeToSomething; using FunctionToRelativeQuarterNum = FunctionDateOrDateTimeToSomething; diff --git a/dbms/src/Functions/tests/gtest_toseconds.cpp b/dbms/src/Functions/tests/gtest_toseconds.cpp new file mode 100644 index 00000000000..16bceb82027 --- /dev/null +++ b/dbms/src/Functions/tests/gtest_toseconds.cpp @@ -0,0 +1,100 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include +#include + +namespace DB::tests +{ +class TestToSeconds : public DB::tests::FunctionTest +{ +}; + +TEST_F(TestToSeconds, TestAll) +try +{ + DAGContext * dag_context = context.getDAGContext(); + UInt64 ori_flags = dag_context->getFlags(); + dag_context->addFlag(TiDBSQLFlags::TRUNCATE_AS_WARNING); + /// ColumnVector(nullable) + const String func_name = "tidbToSeconds"; + static auto const nullable_datetime_type_ptr = makeNullable(std::make_shared(6)); + static auto const datetime_type_ptr = std::make_shared(6); + static auto const date_type_ptr = std::make_shared(); + auto data_col_ptr = createColumn>( + { + {}, // Null + MyDateTime(0, 0, 0, 0, 0, 0, 0).toPackedUInt(), + MyDateTime(0, 1, 1, 0, 0, 0, 0).toPackedUInt(), + MyDateTime(1969, 1, 2, 1, 1, 1, 1).toPackedUInt(), + MyDateTime(2000, 12, 31, 10, 10, 10, 700).toPackedUInt(), + MyDateTime(2022, 3, 13, 6, 7, 8, 9).toPackedUInt(), + }) + .column; + auto input_col = ColumnWithTypeAndName(data_col_ptr, nullable_datetime_type_ptr, "input"); + auto output_col = createColumn>({{}, {}, 86400, 62135773261ULL, 63145476610ULL, 63814370828ULL}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + + /// ColumnVector(non-null) + data_col_ptr = createColumn( + { + MyDateTime(0, 0, 0, 0, 0, 0, 0).toPackedUInt(), + MyDateTime(1969, 1, 2, 1, 1, 1, 1).toPackedUInt(), + MyDateTime(2000, 12, 31, 10, 10, 10, 700).toPackedUInt(), + MyDateTime(2022, 3, 13, 6, 7, 8, 9).toPackedUInt(), + }) + .column; + input_col = ColumnWithTypeAndName(data_col_ptr, datetime_type_ptr, "input"); + output_col = createColumn>({{}, 62135773261ULL, 63145476610ULL, 63814370828ULL}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + + /// ColumnConst(non-null) + input_col = ColumnWithTypeAndName(createConstColumn(1, MyDateTime(2022, 3, 13, 6, 7, 8, 9).toPackedUInt()).column, datetime_type_ptr, "input"); + output_col = createConstColumn>(1, {63814370828ULL}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + + /// ColumnConst(nullable) + input_col = ColumnWithTypeAndName(createConstColumn>(1, MyDateTime(2022, 3, 13, 6, 7, 8, 9).toPackedUInt()).column, nullable_datetime_type_ptr, "input"); + output_col = createConstColumn>(1, {63814370828ULL}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + + /// ColumnConst(nullable(null)) + input_col = ColumnWithTypeAndName(createConstColumn>(1, {}).column, nullable_datetime_type_ptr, "input"); + output_col = createConstColumn>(1, {}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + + /// MyDate ColumnVector(non-null) + data_col_ptr = createColumn( + { + MyDate(0000, 0, 1).toPackedUInt(), + MyDate(0000, 1, 1).toPackedUInt(), + MyDate(1969, 1, 1).toPackedUInt(), + MyDate(2000, 12, 1).toPackedUInt(), + MyDate(2022, 3, 14).toPackedUInt(), + }) + .column; + input_col = ColumnWithTypeAndName(data_col_ptr, date_type_ptr, "input"); + output_col = createColumn>({{}, 86400, 62135683200ULL, 63142848000ULL, 63814435200ULL}); + ASSERT_COLUMN_EQ(output_col, executeFunction(func_name, input_col)); + dag_context->setFlags(ori_flags); +} +CATCH + +} // namespace DB::tests + diff --git a/tests/fullstack-test/expr/to_seconds.test b/tests/fullstack-test/expr/to_seconds.test new file mode 100644 index 00000000000..345a6f3f11b --- /dev/null +++ b/tests/fullstack-test/expr/to_seconds.test @@ -0,0 +1,61 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t1; +mysql> create table test.t1(c1 varchar(100), c2 datetime, c3 date); +mysql> insert into test.t1 values('', '1999-10-10 10:10:10.123', '1999-01-10'), ('200', '1999-02-10 10:10:10.123', '1999-11-10'), ('1999-01-10', '1999-10-10 10:10:10.123', '1999-01-10'); +# leap year +mysql> insert into test.t1 values('2000-2-10', '2000-2-10 10:10:10', '2000-2-10'); +# non leap year +mysql> insert into test.t1 values('2001-2-10', '2001-2-10 10:10:10', '2001-2-10'); +# zero day +mysql> insert into test.t1 values('2000-2-0', '2000-2-10 10:10:10', '2000-2-10'); +mysql> alter table test.t1 set tiflash replica 1; +func> wait_table test t1 +mysql> set @@tidb_isolation_read_engines='tiflash'; set @@tidb_enforce_mpp = 1; select c1, to_seconds(c1) from test.t1 order by 1; ++------------+----------------+ +| c1 | to_seconds(c1) | ++------------+----------------+ +| | NULL | +| 1999-01-10 | 63083145600 | +| 200 | NULL | +| 2000-2-0 | NULL | +| 2000-2-10 | 63117360000 | +| 2001-2-10 | 63148982400 | ++------------+----------------+ +mysql> set @@tidb_isolation_read_engines='tiflash'; set @@tidb_enforce_mpp = 1; select c2, to_seconds(c2) from test.t1 order by 1; ++---------------------+----------------+ +| c2 | to_seconds(c2) | ++---------------------+----------------+ +| 1999-02-10 10:10:10 | 63085860610 | +| 1999-10-10 10:10:10 | 63106769410 | +| 1999-10-10 10:10:10 | 63106769410 | +| 2000-02-10 10:10:10 | 63117396610 | +| 2000-02-10 10:10:10 | 63117396610 | +| 2001-02-10 10:10:10 | 63149019010 | ++---------------------+----------------+ +mysql> set @@tidb_isolation_read_engines='tiflash'; set @@tidb_enforce_mpp = 1; select c3, to_seconds(c3) from test.t1 order by 1; ++------------+----------------+ +| c3 | to_seconds(c3) | ++------------+----------------+ +| 1999-01-10 | 63083145600 | +| 1999-01-10 | 63083145600 | +| 1999-11-10 | 63109411200 | +| 2000-02-10 | 63117360000 | +| 2000-02-10 | 63117360000 | +| 2001-02-10 | 63148982400 | ++------------+----------------+ + +mysql> drop table if exists test.t1; +