Skip to content

Commit

Permalink
Fix crash when prehandle snapshot (#4140) (#4143)
Browse files Browse the repository at this point in the history
close #4072
  • Loading branch information
ti-chi-bot authored Apr 25, 2022
1 parent a5c8d8f commit 4d42bb8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 43 deletions.
13 changes: 7 additions & 6 deletions dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ TMTPKType getTMTPKType(const IDataType & rhs);
using SortedColumnIDWithPos = std::map<ColumnID, size_t>;
using SortedColumnIDWithPosConstIter = SortedColumnIDWithPos::const_iterator;
using TableInfo = TiDB::TableInfo;
using ColumnInfos = std::vector<const TiDB::ColumnInfo *>;
using ColumnInfo = TiDB::ColumnInfo;
using ColumnInfos = std::vector<ColumnInfo>;
struct DecodingStorageSchemaSnapshot
{
// There is a one-to-one correspondence between elements in `column_defines` and elements in `column_infos`
// Note that some columns(EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN) may be be a real column in tidb schema,
// Note that some columns(EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN) may not be a real column in tidb schema,
// so their corresponding elements in `column_infos` are just nullptr and won't be used when decoding.
DM::ColumnDefinesPtr column_defines;
ColumnInfos column_infos;
Expand Down Expand Up @@ -62,7 +63,7 @@ struct DecodingStorageSchemaSnapshot
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
{
auto & ci = table_info_.columns[i];
const auto & ci = table_info_.columns[i];
column_lut.emplace(ci.id, i);
}
for (size_t i = 0; i < column_defines->size(); i++)
Expand All @@ -72,18 +73,18 @@ struct DecodingStorageSchemaSnapshot
if (cd.id != TiDBPkColumnID && cd.id != VersionColumnID && cd.id != DelMarkColumnID)
{
auto & columns = table_info_.columns;
column_infos.push_back(&columns[column_lut.at(cd.id)]);
column_infos.push_back(columns[column_lut.at(cd.id)]);
}
else
{
column_infos.push_back(nullptr);
column_infos.push_back(ColumnInfo());
}
}

// create pk related metadata if needed
if (is_common_handle)
{
auto & primary_index_info = table_info_.getPrimaryIndexInfo();
const auto & primary_index_info = table_info_.getPrimaryIndexInfo();
for (size_t i = 0; i < primary_index_info.idx_cols.size(); i++)
{
auto pk_column_id = table_info_.columns[primary_index_info.idx_cols[i].offset].id;
Expand Down
26 changes: 12 additions & 14 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ static void writeRegionDataToStorage(
RegionDataReadInfoList & data_list_read,
Poco::Logger * log)
{
constexpr auto FUNCTION_NAME = __FUNCTION__;
constexpr auto FUNCTION_NAME = __FUNCTION__; // NOLINT(readability-identifier-naming)
const auto & tmt = context.getTMTContext();
TableID table_id = region->getMappedTableID();
UInt64 region_decode_cost = -1, write_part_cost = -1;

/// Declare lambda of atomic read then write to call multiple times.
auto atomicReadWrite = [&](bool force_decode) {
auto atomic_read_write = [&](bool force_decode) {
/// Get storage based on table ID.
auto storage = tmt.getStorages().get(table_id);
if (storage == nullptr || storage->isTombstone())
Expand All @@ -75,7 +75,7 @@ static void writeRegionDataToStorage(
}

Block block;
bool ok = false, need_decode = true;
bool need_decode = true;

// try to use block cache if exists
if (region.pre_decode_cache)
Expand All @@ -88,7 +88,6 @@ static void writeRegionDataToStorage(
if (region.pre_decode_cache->schema_version == schema_version)
{
block = std::move(region.pre_decode_cache->block);
ok = true;
need_decode = false;
}
else
Expand All @@ -111,8 +110,7 @@ static void writeRegionDataToStorage(
block_schema_version = decoding_schema_snapshot->schema_version;

auto reader = RegionBlockReader(decoding_schema_snapshot);
ok = reader.read(*block_ptr, data_list_read, force_decode);
if (!ok)
if (!reader.read(*block_ptr, data_list_read, force_decode))
return false;
region_decode_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_decode).Observe(region_decode_cost / 1000.0);
Expand Down Expand Up @@ -156,7 +154,7 @@ static void writeRegionDataToStorage(

/// Try read then write once.
{
if (atomicReadWrite(false))
if (atomic_read_write(false))
return;
}

Expand All @@ -165,7 +163,7 @@ static void writeRegionDataToStorage(
GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomicReadWrite(true))
if (!atomic_read_write(true))
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
Expand Down Expand Up @@ -471,7 +469,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Block res_block;

const auto atomicDecode = [&](bool force_decode) -> bool {
const auto atomic_decode = [&](bool force_decode) -> bool {
Stopwatch watch;
auto storage = tmt.getStorages().get(table_id);
if (storage == nullptr || storage->isTombstone())
Expand Down Expand Up @@ -512,12 +510,12 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
/// decoding data. Check the test case for more details.
FAIL_POINT_PAUSE(FailPoints::pause_before_apply_raft_snapshot);

if (!atomicDecode(false))
if (!atomic_decode(false))
{
GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomicDecode(true))
if (!atomic_decode(true))
throw Exception("Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
ErrorCodes::LOGICAL_ERROR);
}
Expand All @@ -536,7 +534,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)

auto table_id = region->getMappedTableID();
auto context = tmt.getContext();
const auto atomicGet = [&](bool force_decode) -> bool {
const auto atomic_get = [&](bool force_decode) -> bool {
auto storage = tmt.getStorages().get(table_id);
if (storage == nullptr)
{
Expand All @@ -555,12 +553,12 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
return true;
};

if (!atomicGet(false))
if (!atomic_get(false))
{
GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomicGet(true))
if (!atomic_get(true))
throw Exception("Get " + region->toString() + " belonging table " + DB::toString(table_id) + " is_command_handle fail",
ErrorCodes::LOGICAL_ERROR);
}
Expand Down
10 changes: 4 additions & 6 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

using TiDB::DatumFlat;
using TiDB::TableInfo;

RegionBlockReader::RegionBlockReader(DecodingStorageSchemaSnapshotConstPtr schema_snapshot_)
: schema_snapshot{std::move(schema_snapshot_)}
{}
Expand Down Expand Up @@ -79,7 +76,8 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
next_column_pos++;
column_ids_iter++;
}
constexpr size_t MustHaveColCnt = 3; // extra handle, del, version
// extra handle, del, version must exists
constexpr size_t MustHaveColCnt = 3; // NOLINT(readability-identifier-naming)
if (unlikely(next_column_pos != MustHaveColCnt))
throw Exception("del, version column must exist before all other visible columns.", ErrorCodes::LOGICAL_ERROR);

Expand Down Expand Up @@ -131,9 +129,9 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d
auto next_column_pos_copy = next_column_pos;
while (column_ids_iter_copy != read_column_ids.end())
{
const auto * ci = schema_snapshot->column_infos[column_ids_iter_copy->second];
const auto & ci = schema_snapshot->column_infos[column_ids_iter_copy->second];
// when pk is handle, we can decode the pk from the key
if (!(schema_snapshot->pk_is_handle && ci->hasPriKeyFlag()))
if (!(schema_snapshot->pk_is_handle && ci.hasPriKeyFlag()))
{
auto * raw_column = const_cast<IColumn *>((block.getByPosition(next_column_pos_copy)).column.get());
raw_column->insertDefault();
Expand Down
34 changes: 17 additions & 17 deletions dbms/src/Storages/Transaction/RowCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector<Field> &
ErrorCodes::LOGICAL_ERROR);

size_t encoded_fields_idx = 0;
for (auto & column_info : table_info.columns)
for (const auto & column_info : table_info.columns)
{
if ((table_info.pk_is_handle || table_info.is_common_handle) && column_info.hasPriKeyFlag())
continue;
Expand Down Expand Up @@ -306,22 +306,22 @@ bool appendRowV2ToBlock(
: appendRowV2ToBlockImpl<false>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
}

inline bool addDefaultValueToColumnIfPossible(const ColumnInfo * column_info, Block & block, size_t block_column_pos, bool force_decode)
inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool force_decode)
{
// We consider a missing column could be safely filled with NULL, unless it has not default value and is NOT NULL.
// This could saves lots of unnecessary schema syncs for old data with a schema that has newly added columns.
// for clustered index, if the pk column does not exists, it can still be decoded from the key
if (column_info->hasPriKeyFlag())
if (column_info.hasPriKeyFlag())
return true;

if (column_info->hasNoDefaultValueFlag() && column_info->hasNotNullFlag())
if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag())
{
if (!force_decode)
return false;
}
// not null or has no default value, tidb will fill with specific value.
auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
raw_column->insert(column_info->defaultValueToField());
raw_column->insert(column_info.defaultValueToField());
return true;
}

Expand Down Expand Up @@ -375,7 +375,7 @@ bool appendRowV2ToBlockImpl(
}
else if (column_ids_iter->first < next_datum_column_id)
{
const auto * column_info = column_infos[column_ids_iter->second];
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
column_ids_iter++;
Expand All @@ -401,7 +401,7 @@ bool appendRowV2ToBlockImpl(
}

auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto * column_info = column_infos[column_ids_iter->second];
const auto & column_info = column_infos[column_ids_iter->second];
if (is_null)
{
if (!raw_column->isColumnNullable())
Expand All @@ -412,7 +412,7 @@ bool appendRowV2ToBlockImpl(
}
else
{
throw Exception("Detected invalid null when decoding data of column " + column_info->name + " with column type " + raw_column->getName(),
throw Exception("Detected invalid null when decoding data of column " + column_info.name + " with column type " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
Expand All @@ -436,7 +436,7 @@ bool appendRowV2ToBlockImpl(
{
if (column_ids_iter->first != pk_handle_id)
{
const auto * column_info = column_infos[column_ids_iter->second];
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
}
Expand Down Expand Up @@ -490,7 +490,7 @@ bool appendRowV1ToBlock(
}
else if (column_ids_iter->first < next_field_column_id)
{
const auto * column_info = column_infos[column_ids_iter->second];
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
column_ids_iter++;
Expand All @@ -509,32 +509,32 @@ bool appendRowV1ToBlock(
}

auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto * column_info = column_infos[column_ids_iter->second];
DatumFlat datum(decoded_field_iter->second, column_info->tp);
const auto & column_info = column_infos[column_ids_iter->second];
DatumFlat datum(decoded_field_iter->second, column_info.tp);
const Field & unflattened = datum.field();
if (datum.overflow(*column_info))
if (datum.overflow(column_info))
{
// Overflow detected, fatal if force_decode is true,
// as schema being newer and narrow shouldn't happen.
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get<UInt64>()) + " of column "
+ column_info->name + " with column " + raw_column->getName(),
+ column_info.name + " with column " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return false;
}
if (datum.invalidNull(*column_info))
if (datum.invalidNull(column_info))
{
// Null value with non-null type detected, fatal if force_decode is true,
// as schema being newer and with invalid null shouldn't happen.
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get<UInt64>())
+ " of column " + column_info->name + " with type " + raw_column->getName(),
+ " of column " + column_info.name + " with type " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}

Expand All @@ -550,7 +550,7 @@ bool appendRowV1ToBlock(
{
if (column_ids_iter->first != pk_handle_id)
{
const auto * column_info = column_infos[column_ids_iter->second];
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
}
Expand Down

0 comments on commit 4d42bb8

Please sign in to comment.