Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] if propNames is empty, replaceDefaultValue may wrong. #3025

Merged
merged 3 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ void StorageServer::stop() {

webSvc_.reset();

if (txnMan_) {
txnMan_->stop();
}
if (taskMgr_) {
taskMgr_->shutdown();
}
Expand All @@ -348,10 +351,6 @@ void StorageServer::stop() {
if (adminServer_) {
adminServer_->stop();
}
if (txnMan_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dose the close order have some effect ?

txnMan_->stop();
txnMan_.reset();
}
if (internalStorageServer_) {
internalStorageServer_->stop();
}
Expand Down
112 changes: 82 additions & 30 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
spaceVidType_ = vidType.value();
}
localPartId_ = req.get_parts().begin()->first;
// replaceNullWithDefaultValue(req_);
replaceNullWithDefaultValue(req_);
auto part = env_->kvstore_->part(spaceId_, localPartId_);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
Expand Down Expand Up @@ -425,44 +425,96 @@ std::string ChainAddEdgesProcessorLocal::makeReadableEdge(const cpp2::AddEdgesRe
* in/out edge, but they will calculate independent
* which lead to inconsistance
*
* that why we need to replace the inconsistance prone value
* that's why we need to replace the inconsistance prone value
* at the monment the request comes
* */
void ChainAddEdgesProcessorLocal::replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req) {
auto& edgesOfPart = *req.parts_ref();
if (edgesOfPart.empty()) {
return;
}
auto& edgesOfFirstPart = edgesOfPart.begin()->second;
if (edgesOfFirstPart.empty()) {
return;
}
auto firstEdgeKey = edgesOfFirstPart.front().get_key();
auto edgeType = std::abs(*firstEdgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);

DefaultValueContext expCtx;
// the coming request has two forms
// 1st "propNames" is empty,
// which means all vals should be write as the same sequence of schema
// 2nd "propNames" is not empty
// vals of request should be write according to propName of schema
// use the following "idxVec" to identify which index a val should be write to.
std::vector<int64_t> idxVec;
auto& propNames = *req.prop_names_ref();
if (propNames.empty()) {
for (auto i = 0U; i < schema->getNumFields(); ++i) {
idxVec.emplace_back(i);
}
} else {
// first scan the origin input propNames
for (auto& name : propNames) {
int64_t index = schema->getFieldIndex(name);
idxVec.emplace_back(index);
}
// second, check if there any cols not filled but has default val
// we need to append these cols
for (auto i = 0U; i < schema->getNumFields(); ++i) {
auto it = std::find(idxVec.begin(), idxVec.end(), i);
if (it == idxVec.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
idxVec.emplace_back(i);
}
}
}
}

for (auto& part : *req.parts_ref()) {
for (auto& edge : part.second) {
auto edgeKey = edge.get_key();
auto edgeType = std::abs(*edgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);
auto& vals = *edge.props_ref();
for (auto i = 0U; i < schema->getNumFields(); ++i) {
std::string fieldName(schema->getFieldName(i));
auto it = std::find(propNames.begin(), propNames.end(), fieldName);
if (it == propNames.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
propNames.emplace_back(fieldName);
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// it's ok if this field doesn't have a default value
if (vals.size() > idxVec.size()) {
LOG(ERROR) << folly::sformat(
"error vals.size()={} > idxVec.size()={}", vals.size(), idxVec.size());
continue;
}
for (auto i = vals.size(); i < idxVec.size(); ++i) {
auto field = schema->field(idxVec[i]);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::BOOL:
vals.emplace_back(defVal.getBool());
break;
case Value::Type::INT:
vals.emplace_back(defVal.getInt());
break;
case Value::Type::FLOAT:
vals.emplace_back(defVal.getFloat());
break;
case Value::Type::STRING:
vals.emplace_back(defVal.getStr());
break;
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// set null
vals.emplace_back(Value::kNullValue);
}
}
}
Expand Down