Skip to content

Commit

Permalink
rebuild index for new prop (#3332)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
sworduo and Sophie-Xie authored Dec 28, 2021
1 parent 5a4a36b commit 4db974b
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 30 deletions.
37 changes: 34 additions & 3 deletions src/common/utils/IndexKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <thrift/lib/cpp2/protocol/Serializer.h>

#include "common/geo/GeoIndex.h"
#include "common/utils/DefaultValueContext.h"

namespace nebula {

Expand Down Expand Up @@ -170,25 +171,55 @@ Value IndexKeyUtils::parseIndexTTL(const folly::StringPiece& raw) {

// static
StatusOr<std::vector<std::string>> IndexKeyUtils::collectIndexValues(
RowReader* reader, const meta::cpp2::IndexItem* indexItem) {
RowReader* reader,
const meta::cpp2::IndexItem* indexItem,
const meta::SchemaProviderIf* latestSchema) {
if (reader == nullptr) {
return Status::Error("Invalid row reader");
}
auto& cols = indexItem->get_fields();
std::vector<Value> values;
for (const auto& col : cols) {
auto v = reader->getValueByName(col.get_name());
auto propName = col.get_name();
auto val = readValueWithLatestSche(reader, propName, latestSchema);
if (!val.ok()) {
LOG(ERROR) << "prop error by : " << propName << ". status : " << val.status();
return val.status();
}
auto v = val.value();
auto isNullable = col.nullable_ref().value_or(false);
auto ret = checkValue(v, isNullable);
if (!ret.ok()) {
LOG(ERROR) << "prop error by : " << col.get_name() << ". status : " << ret;
LOG(ERROR) << "prop error by : " << propName << ". status : " << ret;
return ret;
}
values.emplace_back(std::move(v));
}
return encodeValues(std::move(values), indexItem);
}

// static
StatusOr<Value> IndexKeyUtils::readValueWithLatestSche(RowReader* reader,
const std::string propName,
const meta::SchemaProviderIf* latestSchema) {
auto value = reader->getValueByName(propName);
if (latestSchema == nullptr || !value.isNull() || value.getNull() != NullType::UNKNOWN_PROP) {
return value;
}
auto field = latestSchema->field(propName);
if (field == nullptr) {
return Status::Error("Unknown prop");
}
if (field->hasDefault()) {
DefaultValueContext expCtx;
auto expr = field->defaultValue()->clone();
return Expression::eval(expr, expCtx);
} else if (field->nullable()) {
return NullType::__NULL__;
}
return Status::Error(folly::stringPrintf("Fail to read prop %s ", propName.c_str()));
}

// static
Status IndexKeyUtils::checkValue(const Value& v, bool isNullable) {
if (!v.isNull()) {
Expand Down
8 changes: 7 additions & 1 deletion src/common/utils/IndexKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,17 @@ class IndexKeyUtils final {
static Value parseIndexTTL(const folly::StringPiece& raw);

static StatusOr<std::vector<std::string>> collectIndexValues(
RowReader* reader, const meta::cpp2::IndexItem* indexItem);
RowReader* reader,
const meta::cpp2::IndexItem* indexItem,
const meta::SchemaProviderIf* latestSchema = nullptr);

private:
IndexKeyUtils() = delete;

static StatusOr<Value> readValueWithLatestSche(RowReader* reader,
const std::string propName,
const meta::SchemaProviderIf* latestSchema);

static Status checkValue(const Value& v, bool isNullable);
};

Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac

for (const auto& item : items) {
if (item->get_schema_id().get_edge_type() == edgeType) {
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get());
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get(), schema);
if (!valuesRet.ok()) {
LOG(WARNING) << "Collect index value failed";
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space

for (const auto& item : items) {
if (item->get_schema_id().get_tag_id() == tagID) {
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get());
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get(), schema);
if (!valuesRet.ok()) {
LOG(WARNING) << "Collect index value failed";
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/exec/UpdateNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class UpdateTagNode : public UpdateNode<VertexID> {
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), schema_);
if (!values.ok()) {
return {};
}
Expand Down Expand Up @@ -752,7 +752,7 @@ class UpdateEdgeNode : public UpdateNode<cpp2::EdgeKey> {
RowReader* reader,
const cpp2::EdgeKey& edgeKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), schema_);
if (!values.ok()) {
return {};
}
Expand Down
13 changes: 7 additions & 6 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
* step 1 , Delete old version index if exists.
*/
if (oReader != nullptr) {
auto ois = indexKeys(partId, oReader.get(), key, index);
auto ois = indexKeys(partId, oReader.get(), key, index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or not.
auto indexState = env_->getIndexState(spaceId_, partId);
Expand All @@ -276,7 +276,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
* step 2 , Insert new edge index
*/
if (nReader != nullptr) {
auto niks = indexKeys(partId, nReader.get(), key, index);
auto niks = indexKeys(partId, nReader.get(), key, index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -384,7 +384,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> AddEdgesProcessor::addEdges(
}

if (!val.empty()) {
auto ois = indexKeys(partId, oReader.get(), e.first, index);
auto ois = indexKeys(partId, oReader.get(), e.first, index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or not.
auto indexState = env_->getIndexState(spaceId_, partId);
Expand Down Expand Up @@ -416,7 +416,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> AddEdgesProcessor::addEdges(
}
}

auto niks = indexKeys(partId, nReader.get(), e.first, index);
auto niks = indexKeys(partId, nReader.get(), e.first, index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -473,8 +473,9 @@ std::vector<std::string> AddEdgesProcessor::indexKeys(
PartitionID partId,
RowReader* reader,
const folly::StringPiece& rawKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::vector<std::string> indexKeys(PartitionID partId,
RowReader* reader,
const folly::StringPiece& rawKey,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

private:
GraphSpaceID spaceId_;
Expand Down
9 changes: 5 additions & 4 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 1 , Delete old version index if exists.
*/
if (oReader != nullptr) {
auto ois = indexKeys(partId, vid, oReader.get(), index);
auto ois = indexKeys(partId, vid, oReader.get(), index, schema.get());
if (!ois.empty()) {
// Check the index is building for the specified partition or
// not.
Expand All @@ -256,7 +256,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 2 , Insert new vertex index
*/
if (nReader != nullptr) {
auto niks = indexKeys(partId, vid, nReader.get(), index);
auto niks = indexKeys(partId, vid, nReader.get(), index, schema.get());
if (!niks.empty()) {
auto v = CommonUtils::ttlValue(schema.get(), nReader.get());
auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : "";
Expand Down Expand Up @@ -334,8 +334,9 @@ std::vector<std::string> AddVerticesProcessor::indexKeys(
PartitionID partId,
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::vector<std::string> indexKeys(PartitionID partId,
const VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

private:
GraphSpaceID spaceId_;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, partId, srcId, type, rank, dstId);
std::string val;
auto ret = env_->kvstore_->get(spaceId_, partId, key, &val);
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(type));

if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
/**
Expand All @@ -162,7 +163,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
}
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), index.get());
auto valuesRet =
IndexKeyUtils::collectIndexValues(reader.get(), index.get(), schema.get());
if (!valuesRet.ok()) {
continue;
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteVerticesProcessor::deleteVer
}
target.emplace_back(std::move(l));
}
auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId);
RowReaderWrapper reader;
for (auto& index : indexes_) {
if (index->get_schema_id().get_tag_id() == tagId) {
Expand All @@ -147,7 +148,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteVerticesProcessor::deleteVer
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
}
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), index.get());
auto valuesRet =
IndexKeyUtils::collectIndexValues(reader.get(), index.get(), schema.get());
if (!valuesRet.ok()) {
continue;
}
Expand Down
14 changes: 8 additions & 6 deletions src/tools/db-upgrade/DbUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ void UpgraderSpace::encodeVertexValue(PartitionID partId,
return;
}
for (auto& index : it->second) {
auto newIndexKeys = indexVertexKeys(partId, strVid, nReader.get(), index);
auto newIndexKeys = indexVertexKeys(partId, strVid, nReader.get(), index, schema);
for (auto& newIndexKey : newIndexKeys) {
data.emplace_back(std::move(newIndexKey), "");
}
Expand Down Expand Up @@ -997,8 +997,9 @@ std::vector<std::string> UpgraderSpace::indexVertexKeys(
PartitionID partId,
VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down Expand Up @@ -1039,7 +1040,7 @@ void UpgraderSpace::encodeEdgeValue(PartitionID partId,
return;
}
for (auto& index : it->second) {
auto newIndexKeys = indexEdgeKeys(partId, nReader.get(), svId, rank, dstId, index);
auto newIndexKeys = indexEdgeKeys(partId, nReader.get(), svId, rank, dstId, index, schema);
for (auto& newIndexKey : newIndexKeys) {
data.emplace_back(std::move(newIndexKey), "");
}
Expand All @@ -1053,8 +1054,9 @@ std::vector<std::string> UpgraderSpace::indexEdgeKeys(
VertexID& svId,
EdgeRanking rank,
VertexID& dstId,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get());
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema) {
auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema);
if (!values.ok()) {
return {};
}
Expand Down
6 changes: 4 additions & 2 deletions src/tools/db-upgrade/DbUpgrader.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class UpgraderSpace {
std::vector<std::string> indexVertexKeys(PartitionID partId,
VertexID& vId,
RowReader* reader,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

void encodeEdgeValue(PartitionID partId,
RowReader* reader,
Expand All @@ -104,7 +105,8 @@ class UpgraderSpace {
VertexID& svId,
EdgeRanking rank,
VertexID& dstId,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);
std::shared_ptr<nebula::meta::cpp2::IndexItem> index,
const meta::SchemaProviderIf* latestSchema);

WriteResult convertValue(const meta::NebulaSchemaProvider* newSchema,
const meta::SchemaProviderIf* oldSchema,
Expand Down
Loading

0 comments on commit 4db974b

Please sign in to comment.