Skip to content

Commit

Permalink
[#25193] CDC: Fix history retention barriers for replica identity oth…
Browse files Browse the repository at this point in the history
…er than FULL

Summary:
There is a bug in the before image implementation where history retention by CDC was either not set correctly or lifted prematurely for all replica identities other than FULL. Our logic of checking if before image is active or not was simply based on if the replica identity was set to FULL or not. In this diff, we modify this logic to set and update history retention barriers for all replica identities except for CHANGE & NOTHING.

Additionally, we are changing a log from FATAL to DFATAL.
Jira: DB-14379

Test Plan:
Modified existing unit tests for various before image & PG replica identities
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestCompactionWithReplicaIdentityDefault

Reviewers: xCluster, hsunder, skumar, sumukh.phalgaonkar

Reviewed By: skumar

Subscribers: ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40505
  • Loading branch information
siddharth2411 committed Dec 13, 2024
1 parent 31b1d87 commit eebcfe3
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 33 deletions.
31 changes: 21 additions & 10 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1891,13 +1891,18 @@ void CDCServiceImpl::GetChanges(

// If snapshot operation or before image is enabled, don't allow compaction.
std::optional<HybridTime> cdc_sdk_safe_time;
if (record.GetCheckpointType() == EXPLICIT ) {
if (record.GetCheckpointType() == EXPLICIT) {
if (snapshot_bootstrap) {
cdc_sdk_safe_time = HybridTime::FromPB(resp->safe_hybrid_time());
} else {
if (cdc_sdk_explicit_safe_time != 0) {
cdc_sdk_safe_time = HybridTime::FromPB(cdc_sdk_explicit_safe_time);
if (resp->safe_hybrid_time() == 0) {
LOG(WARNING) << "Response safe time is 0 for tablet " << tablet_peer->tablet_id()
<< ", stream: " << stream_id
<< ", setting cdc_sdk_safe_time to HybridTime::kInitial";
}
cdc_sdk_safe_time = (resp->safe_hybrid_time() == 0)
? HybridTime::kInitial
: HybridTime::FromPB(resp->safe_hybrid_time());
} else if (cdc_sdk_explicit_safe_time != 0) {
cdc_sdk_safe_time = HybridTime::FromPB(cdc_sdk_explicit_safe_time);
}
} else if (req->safe_hybrid_time() != -1) {
cdc_sdk_safe_time = HybridTime::FromPB(req->safe_hybrid_time());
Expand Down Expand Up @@ -2639,7 +2644,9 @@ Result<bool> CDCServiceImpl::CheckBeforeImageActive(
}

if (replica_identity_map.find(table_id) != replica_identity_map.end()) {
if (replica_identity_map.at(table_id) == PgReplicaIdentity::FULL) {
auto table_replica_identity = replica_identity_map.at(table_id);
if (table_replica_identity != PgReplicaIdentity::CHANGE &&
table_replica_identity != PgReplicaIdentity::NOTHING) {
is_before_image_active = true;
break;
}
Expand All @@ -2650,16 +2657,20 @@ Result<bool> CDCServiceImpl::CheckBeforeImageActive(
} else {
auto table_id = tablet_peer->tablet_metadata()->table_id();
if (replica_identity_map.find(table_id) != replica_identity_map.end()) {
is_before_image_active = (replica_identity_map.at(table_id) == PgReplicaIdentity::FULL);
auto table_replica_identity = replica_identity_map.at(table_id);
if (table_replica_identity != PgReplicaIdentity::CHANGE &&
table_replica_identity != PgReplicaIdentity::NOTHING) {
is_before_image_active = true;
}
} else {
return STATUS_FORMAT(NotFound, "Replica identity not found for table: $0 ", table_id);
}
}

} else {
is_before_image_active =
(stream_metadata.GetRecordType() == CDCRecordType::ALL ||
stream_metadata.GetRecordType() == CDCRecordType::PG_FULL);
auto stream_record_type = stream_metadata.GetRecordType();
is_before_image_active = stream_record_type != CDCRecordType::CHANGE &&
stream_record_type != CDCRecordType::PG_NOTHING;
}

return is_before_image_active;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3125,7 +3125,7 @@ Status GetChangesForCDCSDK(

if (!snapshot_operation && !CheckResponseSafeTimeCorrectness(
last_read_wal_op_record_time, safe_time, is_entire_wal_read)) {
LOG(FATAL) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id
LOG(DFATAL) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id
<< ", response safe time: " << safe_time
<< " is greater than last read WAL OP's record time: "
<< last_read_wal_op_record_time
Expand Down
43 changes: 39 additions & 4 deletions src/yb/integration-tests/cdcsdk_before_image-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class CDCSDKBeforeImageTest : public CDCSDKYsqlTest {
// These tests work on older RECORD_TYPE support, so we disable replica identity support here so
// that the record_type mode is used.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_enable_replica_identity) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
}
};

Expand Down Expand Up @@ -263,6 +264,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBefor
// For CHANGE mode
TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBeforeImageChangeMode)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ASSERT_OK(SetUpWithParams(1, 1, true /* colocated */));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down Expand Up @@ -318,6 +320,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBefor
TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBeforeImageAllMode)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_enable_postgres_replica_identity) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ASSERT_OK(SetUpWithParams(1, 1, true /* colocated */));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand All @@ -337,6 +340,15 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBefor
ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1"));
ASSERT_OK(conn.Execute("DELETE FROM test_table WHERE key = 1"));

// Wait for sometime for UpdatePeersAndMetrics to move the barriers.
SleepFor(MonoDelta::FromSeconds(3 * FLAGS_update_min_cdc_indices_interval_secs));
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ false));
// The DocDB entry count shouldnt change after compaction as CDC is holding the history barrier
// due to before image mode ALL.
WaitForCompaction(table, true /* expect_equal_entries_after_compaction */);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 1, 1, 1, 0, 0};
const uint32_t expected_count_with_packed_row[] = {1, 1, 1, 1, 0, 0};
Expand Down Expand Up @@ -373,6 +385,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBefor
TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBeforeImageFullMode)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_enable_postgres_replica_identity) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ASSERT_OK(SetUpWithParams(1, 1, true /* colocated */));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand All @@ -392,6 +405,15 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestSingleShardUpdateBefor
ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1"));
ASSERT_OK(conn.Execute("DELETE FROM test_table WHERE key = 1"));

// Wait for sometime for UpdatePeersAndMetrics to move the barriers.
SleepFor(MonoDelta::FromSeconds(3 * FLAGS_update_min_cdc_indices_interval_secs));
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ false));
// The DocDB entry count shouldnt change after compaction as CDC is holding the history barrier
// due to PG_FULL.
WaitForCompaction(table, true /* expect_equal_entries_after_compaction */);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 1, 1, 1, 0, 0};
const uint32_t expected_count_with_packed_row[] = {1, 1, 1, 1, 0, 0};
Expand Down Expand Up @@ -598,6 +620,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(
TestSingleShardUpdateBeforeImageDefaultMode)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_enable_postgres_replica_identity) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ASSERT_OK(SetUpWithParams(1, 1, true /* colocated */));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand All @@ -617,6 +640,15 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(
ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1;"));
ASSERT_OK(conn.Execute("DELETE FROM test_table WHERE key = 1"));

// Wait for sometime for UpdatePeersAndMetrics to move the barriers.
SleepFor(MonoDelta::FromSeconds(3 * FLAGS_update_min_cdc_indices_interval_secs));
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ false));
// The DocDB entry count shouldnt change after compaction as CDC is holding the history barrier
// due to PG_DEFAULT.
WaitForCompaction(table, true /* expect_equal_entries_after_compaction */);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 1, 1, 1, 0, 0};
const uint32_t expected_count_with_packed_row[] = {1, 1, 1, 1, 0, 0};
Expand Down Expand Up @@ -653,6 +685,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(
TestSingleShardUpdateBeforeImageNothingMode)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_enable_postgres_replica_identity) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ASSERT_OK(SetUpWithParams(1, 1, true /* colocated */));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down Expand Up @@ -1309,8 +1342,9 @@ TEST_F(
first_get_changes = false;

} else {
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()));
change_resp = ASSERT_RESULT(GetChangesFromCDC(
stream_id, tablets, &change_resp.cdc_sdk_checkpoint(), 0 /* tablet_idx */,
change_resp.safe_hybrid_time()));
}

if (change_resp.cdc_sdk_proto_records_size() == 0) {
Expand Down Expand Up @@ -1403,8 +1437,9 @@ TEST_F(
first_get_changes = false;

} else {
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()));
change_resp = ASSERT_RESULT(GetChangesFromCDC(
stream_id, tablets, &change_resp.cdc_sdk_checkpoint(), 0 /* tablet_idx */,
change_resp.safe_hybrid_time()));
}

if (change_resp.cdc_sdk_proto_records_size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3660,5 +3660,80 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestColocationWithIndexes) {
}
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestCompactionWithReplicaIdentityDefault) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ASSERT_OK(SetUpWithParams(
/* replication_factor */ 3, /* num_masters */ 1, /* colocated */ false,
/* cdc_populate_safepoint_record */ true));
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT"));

xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());
auto expected_row = ReadFromCdcStateTable(stream_id, tablets[0].tablet_id());
if (!expected_row.ok()) {
FAIL();
}
ASSERT_GE((*expected_row).op_id.term, 0);
ASSERT_GE((*expected_row).op_id.index, 0);
ASSERT_NE((*expected_row).cdc_sdk_safe_time, HybridTime::kInvalid);
ASSERT_GE((*expected_row).cdc_sdk_latest_active_time, 0);

// Assert that the safe time is not invalid in the tablet_peers
AssertSafeTimeAsExpectedInTabletPeers(tablets[0].tablet_id(), (*expected_row).cdc_sdk_safe_time);

ASSERT_OK(WriteRows(1 /* start */, 2 /* end */, &test_cluster_));
ASSERT_OK(InitVirtualWAL(stream_id, {table.table_id()}, kVWALSessionId1));
int expected_dml_records = 1;
auto get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {table.table_id()}, expected_dml_records, false /* init_virtual_wal */,
kVWALSessionId1));
LOG(INFO) << "Got " << get_consistent_changes_resp.records.size() << " records.";
// (BEGIN + DML + COMMIT )
ASSERT_EQ(get_consistent_changes_resp.records.size(), 3);

// Additional call to send explicit checkpoint
auto resp = ASSERT_RESULT(GetConsistentChangesFromCDC(stream_id, kVWALSessionId1));
ASSERT_EQ(resp.cdc_sdk_proto_records_size(), 0);

ASSERT_OK(UpdateRows(1 /* key */, 6 /* value */, &test_cluster_));
ASSERT_OK(UpdateRows(1 /* key */, 10 /* value */, &test_cluster_));
LOG(INFO) << "Sleep for UpdatePeersAndMetrics to move barriers";
SleepFor(MonoDelta::FromSeconds(2 * FLAGS_update_min_cdc_indices_interval_secs));
auto peers = ListTabletPeers(test_cluster(), ListPeersFilter::kLeaders);
auto count_before_compaction = CountEntriesInDocDB(peers, table.table_id());
int count_after_compaction;
ASSERT_NOK(WaitFor(
[&]() {
auto result = test_cluster_.mini_cluster_->CompactTablets();
if (!result.ok()) {
return false;
}
count_after_compaction = CountEntriesInDocDB(peers, table.table_id());
if (count_after_compaction < count_before_compaction) {
return true;
}
return false;
},
MonoDelta::FromSeconds(15), "Compaction is restricted for the stream."));
LOG(INFO) << "count_before_compaction: " << count_before_compaction
<< " count_after_compaction: " << count_after_compaction;
ASSERT_EQ(count_after_compaction, count_before_compaction);

expected_dml_records = 2;
get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {table.table_id()}, expected_dml_records, false /* init_virtual_wal */,
kVWALSessionId1));
LOG(INFO) << "Got " << get_consistent_changes_resp.records.size() << " records.";
// 2 * (BEGIN + DML + COMMIT )
ASSERT_EQ(get_consistent_changes_resp.records.size(), 6);
}

} // namespace cdc
} // namespace yb
1 change: 1 addition & 0 deletions src/yb/integration-tests/cdcsdk_replica_identity-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CDCSDKReplicaIdentityTest : public CDCSDKYsqlTest {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_max_replication_slots) = 500;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_enable_replica_identity) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_enable_implicit_checkpointing) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;

// TODO(#23000) Rationalize the tests to run with consistent / non-consistent snapshot streams.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = false;
Expand Down
10 changes: 5 additions & 5 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3671,7 +3671,8 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
ValidateColumnCounts(change_resp, 2);
}

void CDCSDKYsqlTest::WaitForCompaction(YBTableName table) {
void CDCSDKYsqlTest::WaitForCompaction(
YBTableName table, bool expect_equal_entries_after_compaction) {
auto peers = ListTabletPeers(test_cluster(), ListPeersFilter::kLeaders);
int count_before_compaction = CountEntriesInDocDB(peers, table.table_id());
int count_after_compaction = 0;
Expand All @@ -3682,10 +3683,9 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
return false;
}
count_after_compaction = CountEntriesInDocDB(peers, table.table_id());
if (count_after_compaction < count_before_compaction) {
return true;
}
return false;
return (expect_equal_entries_after_compaction &&
count_before_compaction == count_after_compaction) ||
count_after_compaction < count_before_compaction;
},
MonoDelta::FromSeconds(60), "Expected compaction did not happen"));
LOG(INFO) << "count_before_compaction: " << count_before_compaction
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
void CDCSDKAlterWithSysCatalogCompaction(bool packed_row);
void CDCSDKIntentsBatchReadWithAlterAndTabletLeaderSwitch(bool packed_row);

void WaitForCompaction(YBTableName table);
void WaitForCompaction(YBTableName table, bool expect_equal_entries_after_compaction = false);
void VerifySnapshotOnColocatedTables(
xrepl::StreamId stream_id,
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets,
Expand Down
30 changes: 18 additions & 12 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ Status CatalogManager::CreateNewCdcsdkStream(

bool has_consistent_snapshot_option = false;
bool consistent_snapshot_option_use = false;
bool has_replica_identity_full = false;
bool is_history_required_for_replica_identity = false;
bool disable_dynamic_tables = false;

CDCStreamInfoPtr stream;
Expand Down Expand Up @@ -973,10 +973,13 @@ Status CatalogManager::CreateNewCdcsdkStream(
auto schema = VERIFY_RESULT(table->GetSchema());
PgReplicaIdentity replica_identity = schema.table_properties().replica_identity();

// If atleast one of the tables in the stream has replica identity full, we will set the
// history cutoff. UpdatepPeersAndMetrics thread will remove the retention barriers for
// the tablets belonging to the tables with "non-full" replica identity.
has_replica_identity_full |= (replica_identity == PgReplicaIdentity::FULL);
// If atleast one of the tables in the stream has replica identity other than CHANGE &
// NOTHING, we will set the history cutoff. UpdatepPeersAndMetrics thread will remove the
// retention barriers for the tablets belonging to the tables with replica identity CHANGE or
// NOTHING.
is_history_required_for_replica_identity |=
(replica_identity != PgReplicaIdentity::CHANGE &&
replica_identity != PgReplicaIdentity::NOTHING);

metadata->mutable_replica_identity_map()->insert({table_id, replica_identity});
VLOG(1) << "Storing replica identity: " << replica_identity
Expand Down Expand Up @@ -1085,11 +1088,13 @@ Status CatalogManager::CreateNewCdcsdkStream(
// that all of the ALTER TABLE operations have completed.

uint64 consistent_snapshot_time = 0;
bool record_type_option_all = false;
bool is_history_required_for_record_type = false;
if (!FLAGS_ysql_yb_enable_replica_identity || !has_replication_slot_name) {
for (auto option : req.options()) {
if (option.key() == cdc::kRecordType) {
record_type_option_all = option.value() == CDCRecordType_Name(cdc::CDCRecordType::ALL);
is_history_required_for_record_type =
option.value() != CDCRecordType_Name(cdc::CDCRecordType::CHANGE) &&
option.value() != CDCRecordType_Name(cdc::CDCRecordType::PG_NOTHING);
}
}
}
Expand All @@ -1108,11 +1113,12 @@ Status CatalogManager::CreateNewCdcsdkStream(
TEST_CDCSDKFailCreateStreamRequestIfNeeded("CreateCDCSDKStream::kAfterDummyCDCStateEntries"));

// Step 2: Set retention barriers for all tables.
auto require_history_cutoff =
consistent_snapshot_option_use || record_type_option_all || has_replica_identity_full;
RETURN_NOT_OK(SetAllCDCSDKRetentionBarriers(
req, rpc, epoch, table_ids, stream->StreamId(), has_consistent_snapshot_option,
require_history_cutoff));
auto require_history_cutoff = consistent_snapshot_option_use ||
is_history_required_for_record_type ||
is_history_required_for_replica_identity;
RETURN_NOT_OK(SetAllCDCSDKRetentionBarriers(
req, rpc, epoch, table_ids, stream->StreamId(), has_consistent_snapshot_option,
require_history_cutoff));

RETURN_NOT_OK(
TEST_CDCSDKFailCreateStreamRequestIfNeeded("CreateCDCSDKStream::kAfterRetentionBarriers"));
Expand Down

0 comments on commit eebcfe3

Please sign in to comment.