Skip to content

Commit

Permalink
[#3220] Track replicated batches at transaction participant
Browse files Browse the repository at this point in the history
Summary:
To implement the "transaction sealing" protocol, we should track
transactional batches replicated at each transaction participant.  So
with this diff, YBTransaction now tracks all batches sent to participant
tablets as part of the transaction, and assigns an index to each batch.
When a transactional batch is replicated, we add its index to the set of
indexes of replicated batches for the transaction at the transaction
participant.

Test Plan: ybd debug --gtest_filter SealTxnTest.NumBatches*

Reviewers: timur, dmitry, mikhail

Reviewed By: mikhail

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7743
  • Loading branch information
spolitov committed Jan 27, 2020
1 parent 7d65351 commit 0f0aa07
Show file tree
Hide file tree
Showing 33 changed files with 584 additions and 231 deletions.
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ ADD_YB_TEST(ql-list-test)
ADD_YB_TEST(ql-tablet-test)
ADD_YB_TEST(ql-transaction-test)
ADD_YB_TEST(ql-stress-test)
ADD_YB_TEST(seal-txn-test)
ADD_YB_TEST(snapshot-txn-test)
ADD_YB_TEST(serializable-txn-test)
ADD_YB_TEST(tablet_rpc-test)
3 changes: 3 additions & 0 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ AsyncRpcBase<Req, Resp>::AsyncRpcBase(AsyncRpcData* data, YBConsistencyLevel con
}
}
}
if (!ops_.empty()) {
req_.set_batch_idx(ops_.front()->batch_idx);
}
auto& transaction_metadata = batcher_->transaction_metadata();
if (!transaction_metadata.transaction_id.is_nil()) {
SetTransactionMetadata(transaction_metadata, &req_);
Expand Down
51 changes: 10 additions & 41 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@
#include "yb/util/flag_tags.h"
#include "yb/util/logging.h"

DEFINE_bool(redis_allow_reads_from_followers, false,
"If true, the read will be served from the closest replica in the same AZ, which can "
"be a follower.");
TAG_FLAG(redis_allow_reads_from_followers, evolving);
TAG_FLAG(redis_allow_reads_from_followers, runtime);

// When this flag is set to false and we have separate errors for operation, then batcher would
// report IO Error status. Otherwise we will try to combine errors from separate operation to
// status of batch. Useful in tests, when we don't need complex error analysis.
Expand Down Expand Up @@ -349,7 +343,7 @@ void Batcher::CombineErrorUnlocked(const InFlightOpPtr& in_flight_op, const Stat
error_collector_->AddError(in_flight_op->yb_op, status);
if (FLAGS_combine_batcher_errors) {
if (combined_error_.ok()) {
combined_error_ = status;
combined_error_ = status.CloneAndPrepend(in_flight_op->ToString());
} else if (!combined_error_.IsCombined() && combined_error_.code() != status.code()) {
combined_error_ = STATUS(Combined, "Multiple failures");
}
Expand Down Expand Up @@ -449,38 +443,12 @@ void Batcher::TabletLookupFinished(

void Batcher::TransactionReady(const Status& status, const BatcherPtr& self) {
if (status.ok()) {
ExecuteOperations();
ExecuteOperations(Initial::kFalse);
} else {
Abort(status);
}
}

YB_DEFINE_ENUM(OpGroup, (kWrite)(kLeaderRead)(kConsistentPrefixRead));

namespace {
inline bool IsOkToReadFromFollower(const InFlightOpPtr& op) {
return op->yb_op->type() == YBOperation::Type::REDIS_READ &&
FLAGS_redis_allow_reads_from_followers;
}

inline bool IsQLConsistentPrefixRead(const InFlightOpPtr& op) {
return op->yb_op->type() == YBOperation::Type::QL_READ &&
std::static_pointer_cast<YBqlReadOp>(op->yb_op)->yb_consistency_level() ==
YBConsistencyLevel::CONSISTENT_PREFIX;
}
} // namespace

OpGroup GetOpGroup(const InFlightOpPtr& op) {
if (!op->yb_op->read_only()) {
return OpGroup::kWrite;
}
if (IsOkToReadFromFollower(op) || IsQLConsistentPrefixRead(op)) {
return OpGroup::kConsistentPrefixRead;
}

return OpGroup::kLeaderRead;
}

void Batcher::FlushBuffersIfReady() {
// We're only ready to flush if both of the following conditions are true:
// 1. The batcher is in the "resolving tablets" state (i.e. FlushAsync was called).
Expand Down Expand Up @@ -509,8 +477,8 @@ void Batcher::FlushBuffersIfReady() {
ops_queue_.end(),
[](const InFlightOpPtr& lhs, const InFlightOpPtr& rhs) {
if (lhs->tablet.get() == rhs->tablet.get()) {
auto lgroup = GetOpGroup(lhs);
auto rgroup = GetOpGroup(rhs);
auto lgroup = lhs->yb_op->group();
auto rgroup = rhs->yb_op->group();
if (lgroup != rgroup) {
return lgroup < rgroup;
}
Expand All @@ -519,10 +487,10 @@ void Batcher::FlushBuffersIfReady() {
return lhs->tablet.get() < rhs->tablet.get();
});

ExecuteOperations();
ExecuteOperations(Initial::kTrue);
}

void Batcher::ExecuteOperations() {
void Batcher::ExecuteOperations(Initial initial) {
auto transaction = this->transaction();
if (transaction) {
// If this Batcher is executed in context of transaction,
Expand All @@ -533,6 +501,7 @@ void Batcher::ExecuteOperations() {
if (!transaction->Prepare(ops_queue_,
force_consistent_read_,
deadline_,
initial,
std::bind(&Batcher::TransactionReady, this, _1, BatcherPtr(this)),
&transaction_metadata_)) {
return;
Expand Down Expand Up @@ -564,9 +533,9 @@ void Batcher::ExecuteOperations() {

// Now flush the ops for each tablet.
auto start = ops_queue_.begin();
auto start_group = GetOpGroup(*start);
auto start_group = (**start).yb_op->group();
for (auto it = start; it != ops_queue_.end(); ++it) {
auto it_group = GetOpGroup(*it);
auto it_group = (**it).yb_op->group();
// Aggregate and flush the ops so far if either:
// - we reached the next tablet or group
if ((**it).tablet.get() != (**start).tablet.get() ||
Expand Down Expand Up @@ -643,7 +612,7 @@ std::shared_ptr<AsyncRpc> Batcher::CreateRpc(
// Split the read operations according to consistency levels since based on consistency
// levels the read algorithm would differ.
InFlightOps ops(begin, end);
auto op_group = GetOpGroup(*begin);
auto op_group = (**begin).yb_op->group();
AsyncRpcData data{this, tablet, allow_local_calls_in_curr_thread, need_consistent_read,
std::move(ops)};
switch (op_group) {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/client/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ class Batcher : public RefCountedThreadSafe<Batcher> {

void TransactionReady(const Status& status, const BatcherPtr& self);

void ExecuteOperations();
// initial - whether this method is called first time for this batch.
void ExecuteOperations(Initial initial);

// See note about lock ordering in batcher.cc
mutable simple_spinlock mutex_;
Expand Down
6 changes: 3 additions & 3 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,12 @@ TEST_F(ClientTest, TestListTables) {
return n1.ToString() < n2.ToString();
});
ASSERT_EQ(2 + master::kNumSystemTablesWithTxn, tables.size());
ASSERT_EQ(kTableName, tables[0]) << "Tables:" << ToString(tables);
ASSERT_EQ(kTable2Name, tables[1]) << "Tables:" << ToString(tables);
ASSERT_EQ(kTableName, tables[0]) << "Tables:" << AsString(tables);
ASSERT_EQ(kTable2Name, tables[1]) << "Tables:" << AsString(tables);
tables.clear();
ASSERT_OK(client_->ListTables(&tables, "testtb2"));
ASSERT_EQ(1, tables.size());
ASSERT_EQ(kTable2Name, tables[0]) << "Tables:" << ToString(tables);
ASSERT_EQ(kTable2Name, tables[0]) << "Tables:" << AsString(tables);
}

TEST_F(ClientTest, TestListTabletServers) {
Expand Down
4 changes: 3 additions & 1 deletion src/yb/client/client_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ struct YBTableInfo;

typedef std::function<void(std::vector<const TabletId*>*)> LocalTabletFilter;

YB_STRONGLY_TYPED_BOOL(UseCache);
YB_STRONGLY_TYPED_BOOL(ForceConsistentRead);
YB_STRONGLY_TYPED_BOOL(Initial);
YB_STRONGLY_TYPED_BOOL(UseCache);

namespace internal {

Expand Down Expand Up @@ -112,6 +113,7 @@ typedef std::function<void(const Result<internal::RemoteTabletPtr>&)> LookupTabl
typedef std::function<void(const Result<CDCStreamId>&)> CreateCDCStreamCallback;

class AsyncClientInitialiser;

} // namespace client
} // namespace yb

Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/in_flight_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct InFlightOp {
// order of operations. This is important when multiple operations act on the same row.
int sequence_number_;

// Set only for the first operation in group.
// Operations are groupped by tablet and operation kind (write, leader read, follower read).
int64_t batch_idx = -1;

std::string ToString() const;
};

Expand Down
105 changes: 105 additions & 0 deletions src/yb/client/seal-txn-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/client/session.h"
#include "yb/client/transaction.h"
#include "yb/client/txn-test-base.h"

#include "yb/tablet/tablet_peer.h"

DECLARE_int32(TEST_write_rejection_percentage);
DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record);
DECLARE_bool(enable_transaction_sealing);

namespace yb {
namespace client {

class SealTxnTest : public TransactionTestBase {
protected:
void SetUp() override {
FLAGS_enable_transaction_sealing = true;

SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
TransactionTestBase::SetUp();
}

void TestNumBatches(bool restart);
};

// Writes some data as part of transaction and check that batches are correcly tracked by
// transaction participant.
void SealTxnTest::TestNumBatches(bool restart) {
auto txn = CreateTransaction();
auto session = CreateSession(txn);

size_t prev_num_non_empty = 0;
for (auto op_type : {WriteOpType::INSERT, WriteOpType::UPDATE}) {
WriteRows(session, /* transaction= */ 0, op_type, Flush::kFalse);
ASSERT_OK(session->Flush());

size_t num_non_empty = 0;
auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
for (const auto& peer : peers) {
auto txn_participant = peer->tablet()->transaction_participant();
if (!txn_participant) {
continue;
}
auto replicated_batch_idx_set = txn_participant->TEST_TransactionReplicatedBatches(txn->id());
LOG(INFO) << peer->tablet_id() << ": " << replicated_batch_idx_set.ToString();
if (replicated_batch_idx_set.CountSet() != 0) {
++num_non_empty;
ASSERT_EQ(replicated_batch_idx_set.ToString(),
op_type == WriteOpType::INSERT ? "[0]" : "[0, 1]");
}
}

if (op_type == WriteOpType::INSERT) {
ASSERT_GT(num_non_empty, 0);
if (restart) {
ASSERT_OK(cluster_->RestartSync());
}
} else {
ASSERT_EQ(num_non_empty, prev_num_non_empty);
}
prev_num_non_empty = num_non_empty;
}
}

TEST_F(SealTxnTest, NumBatches) {
TestNumBatches(/* restart= */ false);
}

TEST_F(SealTxnTest, NumBatchesWithRestart) {
TestNumBatches(/* restart= */ true);
}

TEST_F(SealTxnTest, NumBatchesWithRejection) {
FLAGS_TEST_write_rejection_percentage = 75;
TestNumBatches(/* restart= */ false);
}

// Check that we could disable writing information about the number of batches,
// since it is required for backward compatibility.
TEST_F(SealTxnTest, NumBatchesDisable) {
FLAGS_enable_transaction_sealing = false;
FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record = true;

auto txn = CreateTransaction();
auto session = CreateSession(txn);
WriteRows(session);
ASSERT_OK(cluster_->RestartSync());
ASSERT_OK(txn->CommitFuture().get());
}

} // namespace client
} // namespace yb
Loading

0 comments on commit 0f0aa07

Please sign in to comment.