Skip to content

Commit

Permalink
Merge branch 'master' into fix-schema-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored Mar 16, 2023
2 parents 39b05e5 + ffe4d81 commit ae228c4
Show file tree
Hide file tree
Showing 55 changed files with 911 additions and 745 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ extern const int PAGE_SIZE_NOT_MATCH = 9006;
extern const int ILLFORMED_PAGE_NAME = 9007;
extern const int ILLFORMAT_RAFT_ROW = 9008;
extern const int REGION_DATA_SCHEMA_UPDATED = 9009;
extern const int REGION_EPOCH_NOT_MATCH = 9010;

extern const int LOCK_EXCEPTION = 10000;
extern const int VERSION_ERROR = 10001;
Expand All @@ -426,6 +425,8 @@ extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
extern const int PS_DIR_APPLY_INVALID_STATUS = 10017;
extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR = 10018;
extern const int REGION_LOCKED = 10019;

extern const int S3_ERROR = 11000;
extern const int CANNOT_SCHEDULE_TASK = 11001;
Expand Down
17 changes: 9 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@

namespace DB
{
namespace ErrorCodes
{
extern const int REGION_EPOCH_NOT_MATCH;
} // namespace ErrorCodes

namespace FailPoints
{
Expand Down Expand Up @@ -319,7 +315,13 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
// and ask RN to send requests again with correct region info. When RN updates region info,
// RN may be sending requests to other WN.

throw Exception("Rejected disaggregated DAG execute because RN region info does not match", DB::ErrorCodes::REGION_EPOCH_NOT_MATCH);
RegionException::UnavailableRegions region_ids;
for (const auto & info : context.getDAGContext()->retry_regions)
region_ids.insert(info.region_id);

throw RegionException(
std::move(region_ids),
RegionException::RegionReadStatus::EPOCH_NOT_MATCH);
}

// A failpoint to test pause before alter lock released
Expand Down Expand Up @@ -820,9 +822,8 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, std::move(disaggregated_snap), expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "disaggregated task has been registered {}", snap_id);
LOG_INFO(log, "task snapshot registered, snapshot_id={}", snap_id);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}

if (has_multiple_partitions)
Expand Down
25 changes: 7 additions & 18 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ class MockS3LockClient : public IS3LockClient
{
public:
explicit MockS3LockClient(std::shared_ptr<TiFlashS3Client> c)
: MockS3LockClient(c, c->bucket())
{
}

MockS3LockClient(std::shared_ptr<Aws::S3::S3Client> c, const String & bucket_)
: s3_client(std::move(c))
, bucket(bucket_)
{
}

Expand All @@ -45,16 +39,16 @@ class MockS3LockClient : public IS3LockClient
{
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
if (!objectExists(*s3_client, bucket, data_file_key))
if (!objectExists(*s3_client, data_file_key))
{
return {false, ""};
}
auto delmark_key = view.getDelMarkKey();
if (objectExists(*s3_client, bucket, delmark_key))
if (objectExists(*s3_client, delmark_key))
{
return {false, ""};
}
uploadEmptyFile(*s3_client, bucket, view.getLockKey(lock_store_id, lock_seq));
uploadEmptyFile(*s3_client, view.getLockKey(lock_store_id, lock_seq));
return {true, ""};
}

Expand All @@ -64,23 +58,18 @@ class MockS3LockClient : public IS3LockClient
// If there is no lock on the given `data_file_key`, then mark as deleted
auto view = S3FilenameView::fromKey(data_file_key);
auto lock_prefix = view.getLockPrefix();
bool any_lock_exist = false;
listPrefix(*s3_client, bucket, lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
if (!result.GetContents().empty())
any_lock_exist = true;
return S3::PageResult{.num_keys = result.GetContents().size(), .more = false};
});
auto lock_key_opt = S3::anyKeyExistWithPrefix(*s3_client, lock_prefix);
bool any_lock_exist = lock_key_opt.has_value();
if (any_lock_exist)
{
return {false, ""};
}
uploadEmptyFile(*s3_client, bucket, view.getDelMarkKey());
uploadEmptyFile(*s3_client, view.getDelMarkKey());
return {true, ""};
}

private:
std::shared_ptr<Aws::S3::S3Client> s3_client;
String bucket;
std::shared_ptr<TiFlashS3Client> s3_client;
};

} // namespace DB::S3
34 changes: 6 additions & 28 deletions dbms/src/Flash/Disaggregated/S3LockService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ bool S3LockService::tryAddLockImpl(
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
// make sure data file exists
auto object_key = key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key;
if (!DB::S3::objectExists(*s3_client, s3_client->bucket(), object_key))
if (!DB::S3::objectExists(*s3_client, object_key))
{
auto * e = response->mutable_result()->mutable_conflict();
e->set_reason(fmt::format("data file not exist, key={}", data_file_key));
Expand All @@ -210,7 +210,7 @@ bool S3LockService::tryAddLockImpl(

// make sure data file is not mark as deleted
const auto delmark_key = key_view.getDelMarkKey();
if (DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key))
if (DB::S3::objectExists(*s3_client, delmark_key))
{
auto * e = response->mutable_result()->mutable_conflict();
e->set_reason(fmt::format("data file is mark deleted, key={} delmark={}", data_file_key, delmark_key));
Expand All @@ -228,7 +228,7 @@ bool S3LockService::tryAddLockImpl(
}
const auto lock_key = key_view.getLockKey(lock_store_id, lock_seq);
// upload lock file
DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), lock_key);
DB::S3::uploadEmptyFile(*s3_client, lock_key);
if (!gc_owner->isOwner())
{
// although the owner is changed after lock file is uploaded, but
Expand All @@ -244,28 +244,6 @@ bool S3LockService::tryAddLockImpl(
return true;
}

std::optional<String> S3LockService::anyLockExist(const String & lock_prefix)
{
std::optional<String> lock_key;
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
DB::S3::listPrefix(
*s3_client,
s3_client->bucket(),
lock_prefix,
[&lock_key](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
const auto & contents = result.GetContents();
if (!contents.empty())
{
lock_key = contents.front().GetKey();
}
return S3::PageResult{
.num_keys = contents.size(),
.more = false, // do not need more result
};
});
return lock_key;
}

bool S3LockService::tryMarkDeleteImpl(const String & data_file_key, disaggregated::TryMarkDeleteResponse * response)
{
const S3FilenameView key_view = S3FilenameView::fromKey(data_file_key);
Expand Down Expand Up @@ -294,7 +272,8 @@ bool S3LockService::tryMarkDeleteImpl(const String & data_file_key, disaggregate

// make sure data file has not been locked
const auto lock_prefix = key_view.getLockPrefix();
std::optional<String> lock_key = anyLockExist(lock_prefix);
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
std::optional<String> lock_key = S3::anyKeyExistWithPrefix(*s3_client, lock_prefix);
if (lock_key)
{
auto * e = response->mutable_result()->mutable_conflict();
Expand All @@ -318,8 +297,7 @@ bool S3LockService::tryMarkDeleteImpl(const String & data_file_key, disaggregate
{
tagging = TaggingObjectIsDeleted;
}
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), delmark_key, tagging);
DB::S3::uploadEmptyFile(*s3_client, delmark_key, tagging);
if (!gc_owner->isOwner())
{
// owner changed happens when delmark is uploading, can not
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Disaggregated/S3LockService.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ class S3LockService final : private boost::noncopyable

DataFileMutexPtr getDataFileLatch(const String & data_file_key);

static std::optional<String> anyLockExist(const String & lock_prefix);

private:
std::unordered_map<String, DataFileMutexPtr> file_latch_map;
std::mutex file_latch_map_mutex;
Expand Down
10 changes: 4 additions & 6 deletions dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@

namespace DB
{
namespace ErrorCodes
{
extern const int REGION_EPOCH_NOT_MATCH;
} // namespace ErrorCodes

WNEstablishDisaggTaskHandler::WNEstablishDisaggTaskHandler(ContextPtr context_, const DM::DisaggTaskId & task_id)
: context(std::move(context_))
Expand All @@ -60,7 +56,9 @@ void WNEstablishDisaggTaskHandler::prepare(const disaggregated::EstablishDisaggT
context->setSetting("read_tso", start_ts);

if (request->timeout_s() < 0)
{
throw TiFlashException(Errors::Coprocessor::BadRequest, "invalid timeout={}", request->timeout_s());
}
else if (request->timeout_s() > 0)
{
context->setSetting("disagg_task_snapshot_timeout", request->timeout_s());
Expand Down Expand Up @@ -92,9 +90,9 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes
response->set_store_id(kvstore->getStoreID());
}

auto snapshots = context->getSharedContextDisagg()->wn_snapshot_manager;
auto snaps = context->getSharedContextDisagg()->wn_snapshot_manager;
const auto & task_id = *dag_context->getDisaggTaskId();
auto snap = snapshots->getSnapshot(task_id);
auto snap = snaps->getSnapshot(task_id);
RUNTIME_CHECK_MSG(snap, "Snapshot was missing, task_id={}", task_id);

{
Expand Down
44 changes: 22 additions & 22 deletions dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class S3LockServiceTest

s3_client = client_factory.sharedTiFlashClient();
s3_lock_service = std::make_unique<DB::S3::S3LockService>(owner_manager);
::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client, s3_client->bucket());
::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client);
createS3DataFiles();
}
CATCH
Expand All @@ -66,7 +66,7 @@ class S3LockServiceTest
for (size_t i = 1; i <= 5; ++i)
{
auto data_filename = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id});
DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFile::metav2FileName()));
DB::S3::uploadEmptyFile(*s3_client, fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFile::metav2FileName()));
++dm_file_id;
}
}
Expand All @@ -78,7 +78,7 @@ class S3LockServiceTest
{
--dm_file_id;
auto data_filename = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id});
DB::S3::deleteObject(*s3_client, s3_client->bucket(), data_filename.toFullKey());
DB::S3::deleteObject(*s3_client, data_filename.toFullKey());
}
}

Expand Down Expand Up @@ -132,9 +132,9 @@ try

ASSERT_TRUE(status_code.ok()) << status_code.error_message();
ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, lock_key));

DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key);
DB::S3::deleteObject(*s3_client, lock_key);
}
CATCH

Expand All @@ -153,9 +153,9 @@ try

ASSERT_TRUE(status_code.ok()) << status_code.error_message();
ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, delmark_key));

DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key);
DB::S3::deleteObject(*s3_client, delmark_key);
}
CATCH

Expand All @@ -175,7 +175,7 @@ try
auto status_code = s3_lock_service->tryMarkDelete(&request, &response);

ASSERT_TRUE(status_code.ok()) << status_code.error_message();
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, delmark_key));
}

// Try add lock file, should fail
Expand All @@ -190,10 +190,10 @@ try
ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString();
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key));
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, lock_key));
}

DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key);
DB::S3::deleteObject(*s3_client, delmark_key);
}
CATCH

Expand All @@ -216,7 +216,7 @@ try
auto status_code = s3_lock_service->tryAddLock(&request, &response);

ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, lock_key));
}

// Try add delete mark, should fail
Expand All @@ -230,10 +230,10 @@ try
ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString();
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key));
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, delmark_key));
}

DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key);
DB::S3::deleteObject(*s3_client, lock_key);
}
CATCH

Expand All @@ -257,7 +257,7 @@ try
ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString();
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key));
ASSERT_TRUE(!DB::S3::objectExists(*s3_client, lock_key));
}
}
CATCH
Expand All @@ -282,9 +282,9 @@ try

ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString();
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, lock_key));

DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key);
DB::S3::deleteObject(*s3_client, lock_key);
}
};

Expand Down Expand Up @@ -316,7 +316,7 @@ try
auto status_code = s3_lock_service->tryMarkDelete(&request, &response);

ASSERT_TRUE(status_code.ok());
ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key));
ASSERT_TRUE(DB::S3::objectExists(*s3_client, delmark_key));
};

std::vector<std::thread> threads;
Expand All @@ -331,7 +331,7 @@ try
thread.join();
}

DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key);
DB::S3::deleteObject(*s3_client, delmark_key);
}
CATCH

Expand Down Expand Up @@ -390,13 +390,13 @@ try
auto delmark_key = data_filename.toView().getDelMarkKey();

// Either lock or delete file should exist
if (DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key))
if (DB::S3::objectExists(*s3_client, delmark_key))
{
DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key);
DB::S3::deleteObject(*s3_client, delmark_key);
}
else if (DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key))
else if (DB::S3::objectExists(*s3_client, lock_key))
{
DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key);
DB::S3::deleteObject(*s3_client, lock_key);
}
else
{
Expand Down
Loading

0 comments on commit ae228c4

Please sign in to comment.