Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit join #5664

Merged
merged 10 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@
'KW_GEOGRAPHY',
'KW_DURATION',
'KW_ACROSS',
'KW_JOIN',
'KW_LEFT',
'KW_RIGHT',
'KW_INNER',
'KW_OUTER',
'KW_SEMI',
'KW_ANTI',
]


Expand Down
8 changes: 4 additions & 4 deletions src/common/expression/test/FunctionCallExpressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ TEST_F(FunctionCallExpressionTest, FunctionCallTest) {
}
{
TEST_FUNCTION(substr, args_["substr"], "cdef");
TEST_FUNCTION(left, args_["side"], "abcde");
TEST_FUNCTION(right, args_["side"], "mnopq");
TEST_FUNCTION(left, args_["neg_side"], Value::kNullValue);
TEST_FUNCTION(right, args_["neg_side"], Value::kNullValue);
TEST_FUNCTION(`left`, args_["side"], "abcde");
TEST_FUNCTION(`right`, args_["side"], "mnopq");
TEST_FUNCTION(`left`, args_["neg_side"], Value::kNullValue);
TEST_FUNCTION(`right`, args_["neg_side"], Value::kNullValue);

TEST_FUNCTION(lpad, args_["pad"], "1231abcdefghijkl");
TEST_FUNCTION(rpad, args_["pad"], "abcdefghijkl1231");
Expand Down
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct PathContext final : AstContext {
std::string toVidsVar;

bool isShortest{false};
bool singleShortest{false};
bool isWeight{false};
bool noLoop{false};
bool withProp{false};
Expand Down
48 changes: 37 additions & 11 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {
// MemoryTrackerVerified
SCOPED_TIMER(&execTime_);
pathNode_ = asNode<BFSShortestPath>(node());
singleShortest_ = pathNode_->singleShortest();
terminateEarlyVar_ = pathNode_->terminateEarlyVar();

if (step_ == 1) {
Expand Down Expand Up @@ -43,16 +44,25 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this](auto&& status) {
.thenValue([this](std::vector<folly::Try<Status>>&& resps) {
memory::MemoryCheckGuard guard;
UNUSED(status);
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
}
return conjunctPath();
})
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
NG_RETURN_IF_ERROR(status);
step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
Expand Down Expand Up @@ -166,13 +176,26 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
}
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
});
return folly::collectAll(futures).via(runner()).thenValue(
[this](std::vector<folly::Try<DataSet>>&& resps) {
memory::MemoryCheckGuard guard;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
currentDs_.append(std::move(resp));
}
if (singleShortest_) {
currentDs_.rows.resize(1);
}
return Status::OK();
});
}

DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
Expand All @@ -189,6 +212,9 @@ DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
Row row;
row.emplace_back(std::move(result));
ds.rows.emplace_back(std::move(row));
if (singleShortest_) {
return ds;
}
}
}
return ds;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/BFSShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class BFSShortestPathExecutor final : public Executor {

private:
const BFSShortestPath* pathNode_{nullptr};
bool singleShortest_{false};
size_t step_{1};
HashSet leftVisitedVids_;
HashSet rightVisitedVids_;
Expand Down
92 changes: 69 additions & 23 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
pathNode_ = asNode<MultiShortestPath>(node());
terminationVar_ = pathNode_->terminationVar();
singleShortest_ = pathNode_->singleShortest();

if (step_ == 1) {
init();
Expand All @@ -28,12 +29,21 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
.thenValue([this](std::vector<folly::Try<Status>>&& resps) {
// oddStep
UNUSED(status);
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
}
memory::MemoryCheckGuard guard;
return conjunctPath(true);
})
.thenValue([this](auto&& termination) {
Expand Down Expand Up @@ -224,7 +234,7 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
DataSet MultiShortestPathExecutor::doConjunct(
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
auto buildPaths =
[](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
[this](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
for (const auto& leftPath : leftPaths) {
for (const auto& rightPath : rightPaths) {
auto forwardPath = leftPath;
Expand All @@ -234,6 +244,9 @@ DataSet MultiShortestPathExecutor::doConjunct(
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
if (singleShortest_) {
return;
}
}
}
};
Expand All @@ -247,6 +260,9 @@ DataSet MultiShortestPathExecutor::doConjunct(
for (const auto& rightPath : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == rightPath.first) {
if (singleShortest_ && !found->second.second) {
break;
}
buildPaths(leftPath.second, rightPath.second, ds);
found->second.second = false;
}
Expand Down Expand Up @@ -312,25 +328,55 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
futures.emplace_back(std::move(future));
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return folly::collectAll(futures).via(runner()).thenValue(
[this](std::vector<folly::Try<DataSet>>&& resps) {
memory::MemoryCheckGuard guard;
std::unordered_map<Value, std::unordered_set<Value>> uniquePath;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto ds = std::move(respVal).value();
if (singleShortest_) {
for (auto& row : ds.rows) {
auto& pathVal = row.values.front();
auto& path = pathVal.getPath();
auto& src = path.src.vid;
auto& dst = path.steps.back().dst.vid;
auto findSrc = uniquePath.find(src);
if (findSrc == uniquePath.end()) {
uniquePath[src] = {dst};
currentDs_.rows.emplace_back(std::move(row));
} else {
if (findSrc->second.find(dst) == findSrc->second.end()) {
findSrc->second.emplace(dst);
currentDs_.rows.emplace_back(std::move(row));
}
}
}
} else {
currentDs_.append(std::move(ds));
}
}

for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
if (!iter->second.second) {
iter = terminationMap_.erase(iter);
} else {
++iter;
}
}
if (terminationMap_.empty()) {
ectx_->setValue(terminationVar_, true);
return true;
}
return false;
});
for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
if (!iter->second.second) {
iter = terminationMap_.erase(iter);
} else {
++iter;
}
}
if (terminationMap_.empty()) {
ectx_->setValue(terminationVar_, true);
return true;
}
return false;
});
}

void MultiShortestPathExecutor::setNextStepVid(const Interims& paths, const string& var) {
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/MultiShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class MultiShortestPathExecutor final : public Executor {

private:
const MultiShortestPath* pathNode_{nullptr};
bool singleShortest_{false};
size_t step_{1};
std::string terminationVar_;
// {src, <dst, true>}
Expand Down
6 changes: 0 additions & 6 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ Status DataCollectExecutor::rowBasedMove(const std::vector<std::string>& vars) {
for (; seqIter->valid(); seqIter->next()) {
ds.rows.emplace_back(seqIter->moveRow());
}
} else {
return Status::Error("Iterator should be kind of SequentialIter.");
}
}
result_.setDataSet(std::move(ds));
Expand All @@ -140,10 +138,6 @@ Status DataCollectExecutor::collectAllPaths(const std::vector<std::string>& vars
for (; seqIter->valid(); seqIter->next()) {
ds.rows.emplace_back(seqIter->moveRow());
}
} else {
std::stringstream msg;
msg << "Iterator should be kind of SequentialIter, but was: " << iter->kind();
return Status::Error(msg.str());
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/ngql/PathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ SubPlan PathPlanner::singlePairPlan(PlanNode* left, PlanNode* right) {
path->setRightVidVar(pathCtx_->toVidsVar);
path->setColNames({kPathStr});
path->setTerminateEarlyVar(terminateEarlyVar);
path->setSingleShortest(pathCtx_->singleShortest);

auto* loopCondition = singlePairLoopCondition(steps, path->outputVar(), terminateEarlyVar);
auto* loop = Loop::make(qctx, nullptr, path, loopCondition);
Expand Down Expand Up @@ -220,6 +221,7 @@ SubPlan PathPlanner::multiPairPlan(PlanNode* left, PlanNode* right) {
path->setLeftVidVar(pathCtx_->fromVidsVar);
path->setRightVidVar(pathCtx_->toVidsVar);
path->setTerminationVar(terminationVar);
path->setSingleShortest(pathCtx_->singleShortest);
path->setColNames({kPathStr});

SubPlan loopDep = loopDepPlan();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/plan/Algo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::unique_ptr<PlanNodeDescription> BFSShortestPath::explain() const {
addDescription("LeftNextVidVar", folly::toJson(util::toJson(leftVidVar_)), desc.get());
addDescription("RightNextVidVar", folly::toJson(util::toJson(rightVidVar_)), desc.get());
addDescription("steps", folly::toJson(util::toJson(steps_)), desc.get());
addDescription("singleShortest", folly::toJson(util::toJson(singleShortest_)), desc.get());
return desc;
}

Expand All @@ -53,6 +54,7 @@ std::unique_ptr<PlanNodeDescription> MultiShortestPath::explain() const {
addDescription("LeftNextVidVar", folly::toJson(util::toJson(leftVidVar_)), desc.get());
addDescription("RightNextVidVar", folly::toJson(util::toJson(rightVidVar_)), desc.get());
addDescription("steps", folly::toJson(util::toJson(steps_)), desc.get());
addDescription("singleShortest", folly::toJson(util::toJson(singleShortest_)), desc.get());
return desc;
}

Expand Down
18 changes: 18 additions & 0 deletions src/graph/planner/plan/Algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class MultiShortestPath : public BinaryInputNode {
return terminationVar_;
}

bool singleShortest() const {
return singleShortest_;
}

void setLeftVidVar(const std::string& var) {
leftVidVar_ = var;
}
Expand All @@ -51,6 +55,10 @@ class MultiShortestPath : public BinaryInputNode {
terminationVar_ = var;
}

void setSingleShortest(bool singleShortest) {
singleShortest_ = singleShortest;
}

std::unique_ptr<PlanNodeDescription> explain() const override;

private:
Expand All @@ -59,6 +67,7 @@ class MultiShortestPath : public BinaryInputNode {
: BinaryInputNode(qctx, Kind::kMultiShortestPath, left, right), steps_(steps) {}

private:
bool singleShortest_{false};
size_t steps_{0};
std::string leftVidVar_;
std::string rightVidVar_;
Expand Down Expand Up @@ -87,6 +96,10 @@ class BFSShortestPath : public BinaryInputNode {
return terminateEarlyVar_;
}

bool singleShortest() const {
return singleShortest_;
}

void setLeftVidVar(const std::string& var) {
leftVidVar_ = var;
}
Expand All @@ -99,6 +112,10 @@ class BFSShortestPath : public BinaryInputNode {
terminateEarlyVar_ = var;
}

void setSingleShortest(bool singleShortest) {
singleShortest_ = singleShortest;
}

std::unique_ptr<PlanNodeDescription> explain() const override;

private:
Expand All @@ -107,6 +124,7 @@ class BFSShortestPath : public BinaryInputNode {
: BinaryInputNode(qctx, Kind::kBFSShortest, left, right), steps_(steps) {}

private:
bool singleShortest_{false};
std::string leftVidVar_;
std::string rightVidVar_;
std::string terminateEarlyVar_;
Expand Down
4 changes: 4 additions & 0 deletions src/graph/planner/plan/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,10 @@ std::unique_ptr<PlanNodeDescription> DataCollect::explain() const {
PlanNode* DataCollect::clone() const {
auto* newDataCollect = DataCollect::make(qctx_, kind_);
newDataCollect->cloneMembers(*this);
// when cloning, the number of dependencies will be lost, needs to be added manually
for (size_t i = 0; i < numDeps(); ++i) {
newDataCollect->addDep(nullptr);
}
return newDataCollect;
}

Expand Down
Loading