Skip to content

Commit

Permalink
Check _empty when convert old files or make snapshots (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei authored and lichaoyong committed May 17, 2019
1 parent 125cfa3 commit 205ff88
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 70 deletions.
37 changes: 34 additions & 3 deletions be/src/olap/olap_snapshot_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PDelta& delta,
for (auto& segment_group : delta.segment_group()) {
SegmentGroupPB* new_segment_group = extra_meta_pb->add_segment_groups();
*new_segment_group = segment_group;
if (!segment_group.empty()) {
// if segment group does not has empty property, then it is not empty
// if segment group's empty == false, then it is not empty
if (!segment_group.has_empty() || !segment_group.empty()) {
empty = false;
}
num_rows += segment_group.num_rows();
Expand Down Expand Up @@ -391,7 +393,8 @@ OLAPStatus OlapSnapshotConverter::to_alter_tablet_pb(const SchemaChangeStatusMes

// from olap header to tablet meta
OLAPStatus OlapSnapshotConverter::to_new_snapshot(const OLAPHeaderMessage& olap_header, const string& old_data_path_prefix,
const string& new_data_path_prefix, DataDir& data_dir, TabletMetaPB* tablet_meta_pb, vector<RowsetMetaPB>* pending_rowsets) {
const string& new_data_path_prefix, DataDir& data_dir, TabletMetaPB* tablet_meta_pb,
vector<RowsetMetaPB>* pending_rowsets, bool is_startup) {
RETURN_NOT_OK(to_tablet_meta_pb(olap_header, tablet_meta_pb, pending_rowsets));

TabletSchema tablet_schema;
Expand All @@ -406,6 +409,7 @@ OLAPStatus OlapSnapshotConverter::to_new_snapshot(const OLAPHeaderMessage& olap_
RETURN_NOT_OK(rowset.init());
std::vector<std::string> success_files;
RETURN_NOT_OK(rowset.convert_from_old_files(old_data_path_prefix, &success_files));
_modify_old_segment_group_id(const_cast<RowsetMetaPB&>(visible_rowset));
}

// convert inc delta file to rowsets
Expand All @@ -416,7 +420,14 @@ OLAPStatus OlapSnapshotConverter::to_new_snapshot(const OLAPHeaderMessage& olap_
AlphaRowset rowset(&tablet_schema, new_data_path_prefix, &data_dir, alpha_rowset_meta);
RETURN_NOT_OK(rowset.init());
std::vector<std::string> success_files;
RETURN_NOT_OK(rowset.convert_from_old_files(old_data_path_prefix, &success_files));
std::string inc_data_path = old_data_path_prefix;
// in clone case: there is no incremental perfix
// in start up case: there is incremental prefix
if (is_startup) {
inc_data_path = inc_data_path + "/" + INCREMENTAL_DELTA_PREFIX;
}
RETURN_NOT_OK(rowset.convert_from_old_files(inc_data_path, &success_files));
_modify_old_segment_group_id(const_cast<RowsetMetaPB&>(inc_rowset));
}

for (auto it = pending_rowsets->begin(); it != pending_rowsets->end(); ++it) {
Expand All @@ -427,11 +438,14 @@ OLAPStatus OlapSnapshotConverter::to_new_snapshot(const OLAPHeaderMessage& olap_
RETURN_NOT_OK(rowset.init());
std::vector<std::string> success_files;
// std::string pending_delta_path = old_data_path_prefix + PENDING_DELTA_PREFIX;
// if this is a pending segment group, rowset will add pending_delta_prefix when
// construct old file path
RETURN_NOT_OK(rowset.convert_from_old_files(old_data_path_prefix, &success_files));
// pending delta does not have row num, index size, data size info
// should load the pending delta, get these info and reset rowset meta's row num
// data size, index size
RETURN_NOT_OK(rowset.reset_sizeinfo());
// pending rowset not have segment group id == -1 problem, not need to modify sg id in meta
rowset.to_rowset_pb(&(*it));
}
return OLAP_SUCCESS;
Expand Down Expand Up @@ -497,4 +511,21 @@ OLAPStatus OlapSnapshotConverter::save(const string& file_path, const OLAPHeader
return OLAP_SUCCESS;
}

void OlapSnapshotConverter::_modify_old_segment_group_id(RowsetMetaPB& rowset_meta) {
if (!rowset_meta.has_alpha_rowset_extra_meta_pb()) {
return;
}
AlphaRowsetExtraMetaPB* alpha_rowset_extra_meta_pb = rowset_meta.mutable_alpha_rowset_extra_meta_pb();
for (auto& segment_group_pb : alpha_rowset_extra_meta_pb->segment_groups()) {
if (segment_group_pb.segment_group_id() == -1) {
// check if segment groups size == 1
if (alpha_rowset_extra_meta_pb->segment_groups().size() != 1) {
LOG(FATAL) << "the rowset has a segment group's id == -1 but it contains more than one segment group"
<< " it should not happen";
}
(const_cast<SegmentGroupPB&>(segment_group_pb)).set_segment_group_id(0);
}
}
}

}
6 changes: 5 additions & 1 deletion be/src/olap/olap_snapshot_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@ class OlapSnapshotConverter {

// from olap header to tablet meta
OLAPStatus to_new_snapshot(const OLAPHeaderMessage& olap_header, const string& old_data_path_prefix,
const string& new_data_path_prefix, DataDir& data_dir, TabletMetaPB* tablet_meta_pb, vector<RowsetMetaPB>* pending_rowsets);
const string& new_data_path_prefix, DataDir& data_dir, TabletMetaPB* tablet_meta_pb,
vector<RowsetMetaPB>* pending_rowsets, bool is_startup);

// from tablet meta to olap header
OLAPStatus to_old_snapshot(const TabletMetaPB& tablet_meta_pb, string& new_data_path_prefix,
string& old_data_path_prefix, OLAPHeaderMessage* olap_header);

OLAPStatus save(const string& file_path, const OLAPHeaderMessage& olap_header);

private:
void _modify_old_segment_group_id(RowsetMetaPB& rowset_meta);
};

}
Expand Down
19 changes: 18 additions & 1 deletion be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,14 @@ SegmentGroup::~SegmentGroup() {
}

std::string SegmentGroup::_construct_file_name(int32_t segment_id, const string& suffix) const {
// during convert from old files, the segment group id == -1, but we want to convert
// it to 0
int32_t tmp_sg_id = 0;
if (_segment_group_id > 0) {
tmp_sg_id = _segment_group_id;
}
std::string file_name = std::to_string(_rowset_id) + "_"
+ std::to_string(_segment_group_id) + "_" + std::to_string(segment_id) + suffix;
+ std::to_string(tmp_sg_id) + "_" + std::to_string(segment_id) + suffix;
return file_name;
}

Expand Down Expand Up @@ -398,6 +404,7 @@ OLAPStatus SegmentGroup::validate() {
}

bool SegmentGroup::check() {
// if the segment group is converted from old files, _empty == false but _num_segments == 0
if (_empty && (_num_segments > 0 || !zero_num_rows())) {
LOG(WARNING) << "invalid num segments for empty segment group, _num_segments:" << _num_segments
<< ",num rows:" << num_rows();
Expand Down Expand Up @@ -687,6 +694,9 @@ int64_t SegmentGroup::get_tablet_id() {

OLAPStatus SegmentGroup::make_snapshot(const std::string& snapshot_path,
std::vector<std::string>* success_links) {
if (_empty) {
return OLAP_SUCCESS;
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string snapshot_data_file_name = construct_data_file_path(snapshot_path, segment_id);
if (!check_dir_existed(snapshot_data_file_name)) {
Expand Down Expand Up @@ -714,6 +724,10 @@ OLAPStatus SegmentGroup::make_snapshot(const std::string& snapshot_path,

OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_links) {
if (_empty) {
// the segment group is empty, it does not have files, just return
return OLAP_SUCCESS;
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string new_data_file_name = construct_data_file_path(_rowset_path_prefix, segment_id);
if (!check_dir_existed(new_data_file_name)) {
Expand Down Expand Up @@ -747,6 +761,9 @@ OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path

OLAPStatus SegmentGroup::convert_to_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_links) {
if (_empty) {
return OLAP_SUCCESS;
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string new_data_file_name = construct_data_file_path(_rowset_path_prefix, segment_id);
std::string old_data_file_name = construct_old_data_file_path(snapshot_path, segment_id);
Expand Down
66 changes: 5 additions & 61 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,82 +214,30 @@ void TabletManager::cancel_unfinished_schema_change() {
uint64_t canceled_num = 0;
LOG(INFO) << "begin to cancel unfinished schema change.";

TTabletId tablet_id;
TSchemaHash schema_hash;
vector<Version> schema_change_versions;

for (const auto& tablet_instance : _tablet_map) {
for (TabletSharedPtr tablet : tablet_instance.second.table_arr) {
if (tablet == nullptr) {
LOG(WARNING) << "tablet does not exist. tablet_id=" << tablet_instance.first;
continue;
}
AlterTabletTaskSharedPtr alter_task = tablet->alter_task();
if (alter_task == nullptr) {
continue;
}

tablet_id = alter_task->related_tablet_id();
schema_hash = alter_task->related_schema_hash();
TabletSharedPtr new_tablet = get_tablet(tablet_id, schema_hash);
if (new_tablet == nullptr) {
LOG(WARNING) << "new tablet created by alter tablet does not exist. "
<< "tablet=" << tablet->full_name();
continue;
}

AlterTabletTaskSharedPtr new_alter_task = new_tablet->alter_task();
if (new_alter_task != nullptr
&& (new_alter_task->related_tablet_id() != tablet->tablet_id()
|| new_alter_task->related_schema_hash() != tablet->schema_hash())) {
LOG(WARNING) << "base tablet " << tablet->full_name()
<< " new tablet " << new_tablet->full_name()
<< " new tablet link to tablet_id " << new_alter_task->related_tablet_id()
<< " schema_hash " << new_alter_task->related_schema_hash();
continue;
}
// DORIS-3741. Upon restart, it should not clear schema change request.
if (alter_task->alter_state() == ALTER_FINISHED
&& new_alter_task != nullptr
&& new_alter_task->alter_state() == ALTER_FINISHED) {
// if alter task's state == finished, could not do anything
if (alter_task == nullptr || alter_task->alter_state() == ALTER_FINISHED) {
continue;
}

OLAPStatus res = tablet->set_alter_state(ALTER_FAILED);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to set alter state. res=" << res
<< ", base_tablet=" << tablet->full_name();
<< ", base_tablet=" << tablet->full_name();
return;
}
res = tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save base tablet meta. res=" << res
<< ", base_tablet=" << tablet->full_name();
<< ", base_tablet=" << tablet->full_name();
return;
}
if (new_alter_task == nullptr
&& new_tablet->creation_time() < tablet->creation_time()) {
// case 1: create new tablet and save meta successfully, but failed to save alter state in base tablet
// case 2: during clear stage, clear base successfully, but failed to clear new tablet
LOG(WARNING) << "base tablet's alter task is null, skip set state"
<< " base_tablet=" << new_tablet->full_name()
<< " create_time=" << new_tablet->creation_time()
<< " new_tablet=" << tablet->full_name()
<< " create_time=" << tablet->creation_time();
} else {
res = new_tablet->set_alter_state(ALTER_FAILED);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to set alter state. res=" << res
<< ", new_tablet=" << new_tablet->full_name();
return;
}
res = new_tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save new tablet meta. res=" << res
<< ", new_tablet=" << new_tablet->full_name();
return;
}
}

LOG(INFO) << "cancel unfinished alter tablet task. base_tablet=" << tablet->full_name();
++canceled_num;
Expand Down Expand Up @@ -562,11 +510,7 @@ OLAPStatus TabletManager::_drop_tablet_unlock(
TSchemaHash related_schema_hash = alter_task->related_schema_hash();;

// Check tablet is in schema change or not, is base tablet or not
bool is_schema_change_finished = true;
// alter finished? or alter_failed?
if (alter_state != ALTER_FINISHED) {
is_schema_change_finished = false;
}
bool is_schema_change_finished = (alter_state == ALTER_FINISHED || alter_state == ALTER_FAILED);

bool is_drop_base_tablet = false;
TabletSharedPtr related_tablet = _get_tablet_with_no_lock(
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ OLAPStatus EngineCloneTask::_convert_to_new_snapshot(DataDir& data_dir, const st
OlapSnapshotConverter converter;
TabletMetaPB tablet_meta_pb;
vector<RowsetMetaPB> pending_rowsets;
res = converter.to_new_snapshot(olap_header_msg, clone_dir, clone_dir, data_dir, &tablet_meta_pb, &pending_rowsets);
res = converter.to_new_snapshot(olap_header_msg, clone_dir, clone_dir, data_dir, &tablet_meta_pb,
&pending_rowsets, false);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert snapshot to new format. dir='" << clone_dir;
return res;
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/olap_snapshot_converter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ TEST_F(OlapSnapshotConverterTest, ToNewAndToOldSnapshot) {
TabletMetaPB tablet_meta_pb;
vector<RowsetMetaPB> pending_rowsets;
OLAPStatus status = converter.to_new_snapshot(header_msg, _tablet_data_path, _tablet_data_path,
*_data_dir, &tablet_meta_pb, &pending_rowsets);
*_data_dir, &tablet_meta_pb, &pending_rowsets, true);
ASSERT_TRUE(status == OLAP_SUCCESS);

TabletSchema tablet_schema;
Expand Down
4 changes: 2 additions & 2 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ enum AlterTabletState {
}

enum AlterTabletType {
SCHEMA_CHANGE = 0;
ROLLUP = 1;
SCHEMA_CHANGE = 1;
ROLLUP = 2;
}

message AlterTabletPB {
Expand Down

0 comments on commit 205ff88

Please sign in to comment.