Skip to content

Commit

Permalink
#377: Move unique index duplicate-value check to index tablet.
Browse files Browse the repository at this point in the history
Summary: Move duplicate-value check to unique index tablet to fix issue where no error is raised when inserting duplicate values in unique index of a primary key column. To do so, the index info is replicated in tablet metadata and primary key info is added to index metadata.

Test Plan: TestIndex.testUniquePrimaryKeyIndex

Reviewers: mihnea

Reviewed By: mihnea

Subscribers: bogdan, yql, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D5144
  • Loading branch information
robertpang committed Jul 18, 2018
1 parent 7110b9a commit baef7d0
Show file tree
Hide file tree
Showing 29 changed files with 326 additions and 124 deletions.
34 changes: 32 additions & 2 deletions java/yb-cql/src/test/java/org/yb/cql/TestIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,8 @@ private void assertInvalidUniqueIndexDML(String query, String indexName) {
fail("InvalidQueryException not thrown for " + query);
} catch (InvalidQueryException e) {
assertTrue(e.getMessage().startsWith(
String.format("SQL error: Execution Error. Duplicate value disallowed by unique index " +
"%s.%s", DEFAULT_TEST_KEYSPACE, indexName)));
String.format("SQL error: Execution Error. Duplicate value disallowed by unique index %s",
indexName)));
}
}

Expand Down Expand Up @@ -690,6 +690,36 @@ public void testUniqueIndex() throws Exception {
session.execute("update test_unique set v1 = 7, v2 = 'e', v3 = 5 where k = 7;");
}

@Test
public void testUniquePrimaryKeyIndex() throws Exception {
// Test unique index on a primary key column.
session.execute("create table test_unique_pk (h1 int, h2 int, r int, v int, " +
"primary key ((h1, h2), r)) with transactions = {'enabled' : true};");
session.execute("create unique index test_unique_pk_by_h2 on test_unique_pk (h2);");
session.execute("create unique index test_unique_pk_by_r on test_unique_pk (r);");

session.execute("insert into test_unique_pk (h1, h2, r, v) values (1, 1, 1, 1);");

// Test inserting duplicate h2 and r values.
assertInvalidUniqueIndexDML(
"insert into test_unique_pk (h1, h2, r, v) values (1, 1, 2, 2);", "test_unique_pk_by_h2");
assertInvalidUniqueIndexDML(
"insert into test_unique_pk (h1, h2, r, v) values (1, 2, 1, 2);", "test_unique_pk_by_r");

// Restart the cluster
miniCluster.restart();
setUpCqlClient();

// Test inserting duplicate h2 and r values again.
assertInvalidUniqueIndexDML(
"insert into test_unique_pk (h1, h2, r, v) values (1, 1, 2, 2);", "test_unique_pk_by_h2");
assertInvalidUniqueIndexDML(
"insert into test_unique_pk (h1, h2, r, v) values (1, 2, 1, 2);", "test_unique_pk_by_r");

// Test inserting non-duplicate h2 and r value.
session.execute("insert into test_unique_pk (h1, h2, r, v) values (1, 2, 2, 2);");
}

@Test
public void testConditionalDML() throws Exception {
// Create test 2 test tables. One with normal secondary index and one with an additional unique
Expand Down
6 changes: 2 additions & 4 deletions java/yb-cql/src/test/java/org/yb/cql/TestReturnsClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ public void testReturnsStatusWithIndex() throws Exception {
checkReturnStatus("INSERT INTO test_rs_trans(h, r, v1, v2) VALUES (2, 2, 2, 'c')",
columns,
false /* applied */,
"Duplicate value disallowed by unique index " +
DEFAULT_TEST_KEYSPACE + ".test_rs_idx",
"Duplicate value disallowed by unique index test_rs_idx",
"NULL", "NULL", "NULL", "NULL");

// Ensure main table write did not get applied due to the index failure.
Expand Down Expand Up @@ -243,8 +242,7 @@ public void testReturnsStatusWithIndex() throws Exception {
checkReturnStatus("INSERT INTO test_rs_trans_pk(h, r, v1, v2) VALUES (2, 1, 2, 'c')",
columns,
false /* applied */,
"Duplicate value disallowed by unique index " +
DEFAULT_TEST_KEYSPACE + ".test_rs_pk_idx",
"Duplicate value disallowed by unique index test_rs_pk_idx",
"NULL", "NULL", "NULL", "NULL");

// Ensure main table write did not get applied due to the index failure.
Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ message SchemaPB {
// columns to the index table columns.
message IndexInfoPB {
optional bytes table_id = 1; // Index table id.
optional bytes indexed_table_id = 8; // Indexed table id.
optional uint32 version = 2; // Index table's schema version.
optional bool is_local = 3 [ default = false ]; // Whether the index is a local index
optional bool is_unique = 7 [ default = false ]; // Whether the index is a unique index
Expand All @@ -228,6 +229,9 @@ message IndexInfoPB {
repeated IndexColumnPB columns = 4; // Indexed and covering columns.
optional uint32 hash_column_count = 5; // Number of hash columns in the index.
optional uint32 range_column_count = 6; // Number of range columns in the index.

repeated uint32 indexed_hash_column_ids = 9; // Hash column ids in the indexed table.
repeated uint32 indexed_range_column_ids = 10; // Range column ids in the indexed table.
}

message HostPortPB {
Expand Down
40 changes: 39 additions & 1 deletion src/yb/common/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#include "yb/common/common.pb.h"

using std::vector;
using std::unordered_map;
using google::protobuf::RepeatedField;
using google::protobuf::RepeatedPtrField;
using google::protobuf::uint32;

namespace yb {

Expand All @@ -44,23 +47,36 @@ vector<IndexInfo::IndexColumn> IndexColumnFromPB(
return cols;
}

vector<ColumnId> ColumnIdsFromPB(const RepeatedField<uint32>& ids) {
vector<ColumnId> column_ids;
column_ids.reserve(ids.size());
for (const auto& id : ids) {
column_ids.emplace_back(id);
}
return column_ids;
}

} // namespace

IndexInfo::IndexInfo(const IndexInfoPB& pb)
: table_id_(pb.table_id()),
indexed_table_id_(pb.indexed_table_id()),
schema_version_(pb.version()),
is_local_(pb.is_local()),
is_unique_(pb.is_unique()),
columns_(IndexColumnFromPB(pb.columns())),
hash_column_count_(pb.hash_column_count()),
range_column_count_(pb.range_column_count()) {
range_column_count_(pb.range_column_count()),
indexed_hash_column_ids_(ColumnIdsFromPB(pb.indexed_hash_column_ids())),
indexed_range_column_ids_(ColumnIdsFromPB(pb.indexed_range_column_ids())) {
for (const IndexInfo::IndexColumn &index_col : columns_) {
covered_column_ids_.insert(index_col.indexed_column_id);
}
}

void IndexInfo::ToPB(IndexInfoPB* pb) const {
pb->set_table_id(table_id_);
pb->set_indexed_table_id(indexed_table_id_);
pb->set_version(schema_version_);
pb->set_is_local(is_local_);
pb->set_is_unique(is_unique_);
Expand All @@ -69,6 +85,28 @@ void IndexInfo::ToPB(IndexInfoPB* pb) const {
}
pb->set_hash_column_count(hash_column_count_);
pb->set_range_column_count(range_column_count_);
for (const auto id : indexed_hash_column_ids_) {
pb->add_indexed_hash_column_ids(id);
}
for (const auto id : indexed_range_column_ids_) {
pb->add_indexed_range_column_ids(id);
}
}

vector<ColumnId> IndexInfo::index_key_column_ids() const {
unordered_map<ColumnId, ColumnId> map;
for (const auto column : columns_) {
map[column.indexed_column_id] = column.column_id;
}
vector<ColumnId> ids;
ids.reserve(indexed_hash_column_ids_.size() + indexed_range_column_ids_.size());
for (const auto id : indexed_hash_column_ids_) {
ids.push_back(map[id]);
}
for (const auto id : indexed_range_column_ids_) {
ids.push_back(map[id]);
}
return ids;
}

bool IndexInfo::PrimaryKeyColumnsOnly(const Schema& indexed_schema) const {
Expand Down
14 changes: 14 additions & 0 deletions src/yb/common/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class IndexInfo {
void ToPB(IndexInfoPB* pb) const;

const TableId& table_id() const { return table_id_; }
const TableId& indexed_table_id() const { return indexed_table_id_; }
uint32_t schema_version() const { return schema_version_; }
bool is_local() const { return is_local_; }
bool is_unique() const { return is_unique_; }
Expand All @@ -56,6 +57,16 @@ class IndexInfo {
size_t range_column_count() const { return range_column_count_; }
size_t key_column_count() const { return hash_column_count_ + range_column_count_; }

const std::vector<ColumnId>& indexed_hash_column_ids() const {
return indexed_hash_column_ids_;
}
const std::vector<ColumnId>& indexed_range_column_ids() const {
return indexed_range_column_ids_;
}

// Return column ids that are primary key columns of the indexed table.
std::vector<ColumnId> index_key_column_ids() const;

// Index primary key columns of the indexed table only?
bool PrimaryKeyColumnsOnly(const Schema& indexed_schema) const;

Expand All @@ -64,12 +75,15 @@ class IndexInfo {

private:
const TableId table_id_; // Index table id.
const TableId indexed_table_id_; // Indexed table id.
const uint32_t schema_version_ = 0; // Index table's schema version.
const bool is_local_ = false; // Whether this is a local index.
const bool is_unique_ = false; // Whether this is a unique index.
const std::vector<IndexColumn> columns_; // Index columns.
const size_t hash_column_count_ = 0; // Number of hash columns in the index.
const size_t range_column_count_ = 0; // Number of range columns in the index.
const std::vector<ColumnId> indexed_hash_column_ids_; // Hash column ids in the indexed table.
const std::vector<ColumnId> indexed_range_column_ids_; // Range column ids in the indexed table.

// Column ids covered by the index (include indexed columns).
std::unordered_set<ColumnId> covered_column_ids_;
Expand Down
3 changes: 2 additions & 1 deletion src/yb/docdb/doc_operation-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class DocOperationTest : public DocDBTestBase {
void WriteQL(QLWriteRequestPB* ql_writereq_pb, const Schema& schema,
QLResponsePB* ql_writeresp_pb,
const HybridTime& hybrid_time = HybridTime::kMax) {
QLWriteOperation ql_write_op(schema, IndexMap(), kNonTransactionalOperationContext);
QLWriteOperation ql_write_op(schema, IndexMap(), nullptr /* unique_index_key_schema */,
kNonTransactionalOperationContext);
ASSERT_OK(ql_write_op.Init(ql_writereq_pb, ql_writeresp_pb));
auto doc_write_batch = MakeDocWriteBatch();
ASSERT_OK(ql_write_op.Apply(
Expand Down
73 changes: 41 additions & 32 deletions src/yb/docdb/doc_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,41 @@ Status QLWriteOperation::PopulateStatusRow(const DocOperationApplyData& data,
return Status::OK();
}

// Check if a duplicate value is inserted into a unique index.
Result<bool> QLWriteOperation::DuplicateUniqueIndexValue(const DocOperationApplyData& data) {
// If it is not an insert or it is not a unique index, this is not a duplicate insert.
if (request_.type() != QLWriteRequestPB::QL_STMT_INSERT || unique_index_key_schema_ == nullptr) {
return false;
}

// Set up the iterator to read the current primary key associated with the index key.
DocQLScanSpec spec(*unique_index_key_schema_, *pk_doc_key_, request_.query_id());
DocRowwiseIterator iterator(*unique_index_key_schema_, schema_, txn_op_context_,
data.doc_write_batch->doc_db(), data.deadline, data.read_time);
RETURN_NOT_OK(iterator.Init(spec));

// It is a duplicate value if the index key exist already and the associated indexed primary key
// is not the same.
if (!iterator.HasNext()) {
return false;
}
QLTableRow table_row;
RETURN_NOT_OK(iterator.NextRow(&table_row));
std::unordered_set<ColumnId> key_column_ids(unique_index_key_schema_->column_ids().begin(),
unique_index_key_schema_->column_ids().end());
for (const auto& column_value : request_.column_values()) {
ColumnId column_id(column_value.column_id());
if (key_column_ids.count(column_id) > 0) {
auto value = table_row.GetValue(column_id);
if (value && *value != column_value.expr().value()) {
return true;
}
}
}

return false;
}

Status QLWriteOperation::ApplyForJsonOperators(const QLColumnValuePB& column_value,
const DocOperationApplyData& data,
const DocPath& sub_path, const MonoDelta& ttl,
Expand Down Expand Up @@ -2161,6 +2196,12 @@ Status QLWriteOperation::Apply(const DocOperationApplyData& data) {
}
}

if (VERIFY_RESULT(DuplicateUniqueIndexValue(data))) {
response_->set_applied(false);
response_->set_status(QLResponsePB::YQL_STATUS_OK);
return Status::OK();
}

const MonoDelta ttl =
request_.has_ttl() ? MonoDelta::FromMilliseconds(request_.ttl()) : Value::kMaxTtl;
const UserTimeMicros user_timestamp = request_.has_user_timestamp_usec() ?
Expand Down Expand Up @@ -2405,29 +2446,6 @@ QLExpressionPB* NewKeyColumn(QLWriteRequestPB* request, const IndexInfo& index,
: request->add_range_column_values());
}

// Set primary-key condition, i.e. "h1 = xx AND h2 = xx AND r1 = xx AND r2 = xx ...", and add the
// referenced columns.
void SetPrimaryKeyCondition(const IndexInfo* index,
const Schema& indexed_schema,
const QLTableRow& new_row,
QLWriteRequestPB* index_request,
QLConditionPB* condition) {
condition->set_op(QL_OP_AND);
for (size_t i = 0; i < index->columns().size(); i++) {
const IndexInfo::IndexColumn& index_column = index->column(i);
if (indexed_schema.is_key_column(index_column.indexed_column_id)) {
auto result = new_row.GetValue(index_column.indexed_column_id);
if (result) {
QLConditionPB* column_condition = condition->add_operands()->mutable_condition();
column_condition->set_op(QL_OP_EQUAL);
column_condition->add_operands()->set_column_id(index_column.column_id);
*column_condition->add_operands()->mutable_value() = *result;
index_request->mutable_column_refs()->add_ids(index_column.column_id);
}
}
}
}

} // namespace

QLWriteRequestPB* QLWriteOperation::NewIndexRequest(const IndexInfo* index,
Expand All @@ -2436,15 +2454,6 @@ QLWriteRequestPB* QLWriteOperation::NewIndexRequest(const IndexInfo* index,
index_requests_.emplace_back(index, QLWriteRequestPB());
QLWriteRequestPB* request = &index_requests_.back().second;
request->set_type(type);
// For insert statement with unique index, add an "IF NOT EXISTS OR (primary key of the indexed
// table is the same)" condition to detect duplicate values.
if (type == QLWriteRequestPB::QL_STMT_INSERT && index->is_unique()) {
auto* condition = request->mutable_if_expr()->mutable_condition();
condition->set_op(QL_OP_OR);
condition->add_operands()->mutable_condition()->set_op(QL_OP_NOT_EXISTS);
SetPrimaryKeyCondition(index, schema_, new_row, request,
condition->add_operands()->mutable_condition());
}
return request;
}

Expand Down
5 changes: 5 additions & 0 deletions src/yb/docdb/doc_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ class QLWriteOperation : public DocOperation, public DocExprExecutor {
public:
QLWriteOperation(const Schema& schema,
const IndexMap& index_map,
const Schema* unique_index_key_schema,
const TransactionOperationContextOpt& txn_op_context)
: schema_(schema),
index_map_(index_map),
unique_index_key_schema_(unique_index_key_schema),
txn_op_context_(txn_op_context)
{}

Expand Down Expand Up @@ -252,6 +254,8 @@ class QLWriteOperation : public DocOperation, public DocExprExecutor {
const QLTableRow& table_row,
std::unique_ptr<QLRowBlock>* rowblock);

Result<bool> DuplicateUniqueIndexValue(const DocOperationApplyData& data);

CHECKED_STATUS DeleteRow(const DocPath& row_path, DocWriteBatch* doc_write_batch);

bool IsRowDeleted(const QLTableRow& current_row, const QLTableRow& new_row) const;
Expand All @@ -264,6 +268,7 @@ class QLWriteOperation : public DocOperation, public DocExprExecutor {

const Schema& schema_;
const IndexMap& index_map_;
const Schema* unique_index_key_schema_ = nullptr;

// Doc key and doc path for hashed key (i.e. without range columns). Present when there is a
// static column being written.
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ AsyncCreateReplica::AsyncCreateReplica(Master *master,
req_.mutable_schema()->CopyFrom(table_lock->data().pb.schema());
req_.mutable_partition_schema()->CopyFrom(table_lock->data().pb.partition_schema());
req_.mutable_config()->CopyFrom(tablet_pb.committed_consensus_state().config());
if (table_lock->data().pb.has_index_info()) {
req_.mutable_index_info()->CopyFrom(table_lock->data().pb.index_info());
}
}

void AsyncCreateReplica::HandleResponse(int attempt) {
Expand Down
Loading

0 comments on commit baef7d0

Please sign in to comment.