diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index a0446203682..c5c5a9d997e 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -90,7 +90,10 @@ class ScanVertexPropNode : public QueryNode { } auto vertexId = NebulaKeyUtils::getVertexId(vIdLen, key); if (vertexId != currentVertexId && !currentVertexId.empty()) { - collectOneRow(isIntId, vIdLen, currentVertexId); + ret = collectOneRow(isIntId, vIdLen, currentVertexId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } } // collect vertex row currentVertexId = vertexId; if (static_cast(resultDataSet_->rowSize()) >= rowLimit) { @@ -100,7 +103,10 @@ class ScanVertexPropNode : public QueryNode { tagNodes_[tagIdIndex->second]->doExecute(key.toString(), value.toString()); } // iterate key if (static_cast(resultDataSet_->rowSize()) < rowLimit) { - collectOneRow(isIntId, vIdLen, currentVertexId); + ret = collectOneRow(isIntId, vIdLen, currentVertexId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } } cpp2::ScanCursor c; @@ -111,7 +117,9 @@ class ScanVertexPropNode : public QueryNode { return nebula::cpp2::ErrorCode::SUCCEEDED; } - void collectOneRow(bool isIntId, std::size_t vIdLen, const std::string& currentVertexId) { + nebula::cpp2::ErrorCode collectOneRow(bool isIntId, + std::size_t vIdLen, + const std::string& currentVertexId) { List row; nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; // vertexId is the first column @@ -172,6 +180,7 @@ class ScanVertexPropNode : public QueryNode { tagNode->clear(); } } + return ret; } private: @@ -251,7 +260,10 @@ class ScanEdgePropNode : public QueryNode { } auto value = iter->val(); edgeNodes_[edgeNodeIndex->second]->doExecute(key.toString(), value.toString()); - collectOneRow(isIntId, vIdLen); + ret = collectOneRow(isIntId, vIdLen); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } } cpp2::ScanCursor c; @@ -262,9 +274,16 @@ class ScanEdgePropNode : public QueryNode { return nebula::cpp2::ErrorCode::SUCCEEDED; } - void collectOneRow(bool isIntId, std::size_t vIdLen) { + nebula::cpp2::ErrorCode collectOneRow(bool isIntId, std::size_t vIdLen) { List row; nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // Usually there is only one edge node, when all of the egdeNodes are invalid (e.g. ttl + // expired), just skip the row. If we don't skip it, there will be a whole line of empty value. + if (!std::any_of(edgeNodes_.begin(), edgeNodes_.end(), [](const auto& edgeNode) { + return edgeNode->valid(); + })) { + return ret; + } for (auto& edgeNode : edgeNodes_) { ret = edgeNode->collectEdgePropsIfValid( [&row, edgeNode = edgeNode.get(), this]( @@ -312,6 +331,7 @@ class ScanEdgePropNode : public QueryNode { for (auto& edgeNode : edgeNodes_) { edgeNode->clear(); } + return ret; } private: diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 1c63165595d..3e95eccdce6 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -72,6 +72,7 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca return nullptr; } }); + buildEdgeTTLInfo(); return ret; } diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 7806daebfdd..6ebdf7f2134 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -75,6 +75,7 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( return nullptr; } }); + buildTagTTLInfo(); return ret; } diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 42c41907f9b..012c78ff89b 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -338,6 +338,59 @@ TEST(ScanEdgeTest, FilterTest) { } } +TEST(ScanEdgeTest, TtlTest) { + FLAGS_mock_ttl_col = true; + + fs::TempDir rootPath("/tmp/GetNeighborsTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + size_t totalRowCount = 0; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + for (PartitionID partId = 1; partId <= totalParts; partId++) { + auto req = buildRequest({partId}, {""}, {edge}); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + ASSERT_FALSE(resp.get_props()->rows.empty()); + checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount); + } + CHECK_EQ(mock::MockData::serves_.size(), totalRowCount); + } + sleep(FLAGS_mock_ttl_duration + 1); + { + LOG(INFO) << "TTL expired, same request but no data returned"; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + for (PartitionID partId = 1; partId <= totalParts; partId++) { + auto req = buildRequest({partId}, {""}, {edge}); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + ASSERT_TRUE(resp.get_props()->rows.empty()); + } + } + + FLAGS_mock_ttl_col = false; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 61f9e006cea..294d02fe69d 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -501,6 +501,56 @@ TEST(ScanVertexTest, FilterTest) { } } +TEST(ScanVertexTest, TtlTest) { + FLAGS_mock_ttl_col = true; + + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + size_t totalRowCount = 0; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + for (PartitionID partId = 1; partId <= totalParts; partId++) { + auto req = buildRequest({partId}, {""}, {tag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + } + CHECK_EQ(mock::MockData::players_.size(), totalRowCount); + } + sleep(FLAGS_mock_ttl_duration + 1); + { + LOG(INFO) << "TTL expired, same request but no data returned"; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + for (PartitionID partId = 1; partId <= totalParts; partId++) { + auto req = buildRequest({partId}, {""}, {tag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + ASSERT_TRUE(resp.get_props()->rows.empty()); + } + } + + FLAGS_mock_ttl_col = false; +} + } // namespace storage } // namespace nebula