Skip to content

Commit

Permalink
Merge branch 'master' into support-late-materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored Mar 16, 2023
2 parents 1e0dd9c + 86050b5 commit a77301a
Show file tree
Hide file tree
Showing 179 changed files with 5,425 additions and 1,491 deletions.
10 changes: 1 addition & 9 deletions contrib/tiflash-proxy-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
option(ENABLE_UNI_PS_FFI "Enable write proxy data to uni ps" OFF)

if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG)
set(_TIFLASH_PROXY_BUILD_PROFILE "debug")
Expand All @@ -27,11 +26,6 @@ file(GLOB_RECURSE _TIFLASH_PROXY_SRCS "${_TIFLASH_PROXY_SOURCE_DIR}/*.rs")

# Build in the build directory instead of the default source directory
set(TIFLASH_RUST_ENV "CARGO_TARGET_DIR=${CMAKE_CURRENT_BINARY_DIR}" ${TIFLASH_RUST_ENV})
if(ENABLE_UNI_PS_FFI)
set(PROXY_ENABLE_FEATURES "ENABLE_FEATURES=raftstore-proxy/enable-pagestorage")
else()
set(PROXY_ENABLE_FEATURES "")
endif()

# use `CFLAGS=-w CXXFLAGS=-w` to inhibit warning messages.
if (TIFLASH_LLVM_TOOLCHAIN)
Expand Down Expand Up @@ -62,12 +56,10 @@ if(TIFLASH_LLVM_TOOLCHAIN AND USE_LIBCXX)
endif()

message(STATUS "Using rust env for tiflash-proxy: ${TIFLASH_RUST_ENV}")
message(STATUS "Using rust features for tiflash-proxy: ${PROXY_ENABLE_FEATURES}")
message(STATUS "Using rust command for tiflash-proxy: ${_TIFLASH_PROXY_MAKE_COMMAND}")

add_custom_command(OUTPUT ${_TIFLASH_PROXY_LIBRARY}
COMMENT "Building TiFlash Proxy using ${_TIFLASH_PROXY_BUILD_PROFILE} profile"
COMMAND ${CMAKE_COMMAND} -E env ${TIFLASH_RUST_ENV} ${PROXY_ENABLE_FEATURES} ${_TIFLASH_PROXY_MAKE_COMMAND}
COMMAND ${CMAKE_COMMAND} -E env ${TIFLASH_RUST_ENV} ${_TIFLASH_PROXY_MAKE_COMMAND}
VERBATIM
USES_TERMINAL
WORKING_DIRECTORY ${_TIFLASH_PROXY_SOURCE_DIR}
Expand Down
15 changes: 3 additions & 12 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,11 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
{
const size_t string_size = *reinterpret_cast<const size_t *>(pos);
pos += sizeof(string_size);

if (likely(collator))
{
// https://github.com/pingcap/tiflash/pull/6135
// - Generate empty string column
// - Make size of `offsets` as previous way for func `ColumnString::size()`
offsets.push_back(0);
return pos + string_size;
}
if (likely(collator != nullptr))
insertData(pos, string_size);
else
{
insertDataWithTerminatingZero(pos, string_size);
return pos + string_size;
}
return pos + string_size;
}

void updateHashWithValue(size_t n, SipHash & hash, const TiDB::TiDBCollatorPtr & collator, String & sort_key_container) const override
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class IColumn : public COWPtr<IColumn>
* 2. The input parameter `collator` does not work well for complex columns(column tuple),
* but it is only used by TiDB , which does not support complex columns, so just ignore
* the complex column will be ok.
* 3. Even if the restored column will be discarded, deserializeAndInsertFromArena still need to
* insert the data because when spill happens, this column will be used during the merge agg stage.
*/
virtual const char * deserializeAndInsertFromArena(const char * pos, const TiDB::TiDBCollatorPtr & collator) = 0;
const char * deserializeAndInsertFromArena(const char * pos) { return deserializeAndInsertFromArena(pos, nullptr); }
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ extern const int PAGE_SIZE_NOT_MATCH = 9006;
extern const int ILLFORMED_PAGE_NAME = 9007;
extern const int ILLFORMAT_RAFT_ROW = 9008;
extern const int REGION_DATA_SCHEMA_UPDATED = 9009;
extern const int REGION_EPOCH_NOT_MATCH = 9010;

extern const int LOCK_EXCEPTION = 10000;
extern const int VERSION_ERROR = 10001;
Expand All @@ -426,6 +425,8 @@ extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
extern const int PS_DIR_APPLY_INVALID_STATUS = 10017;
extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR = 10018;
extern const int REGION_LOCKED = 10019;

extern const int S3_ERROR = 11000;
extern const int CANNOT_SCHEDULE_TASK = 11001;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ namespace DB
M(exception_in_merged_task_init) \
M(invalid_mpp_version) \
M(force_fail_in_flush_region_data) \
M(force_use_dmfile_format_v3) \
M(force_set_mocked_s3_object_mtime)


Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Common/StringUtils/StringUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ inline bool isUpperAlphaASCII(char c)
return (c >= 'A' && c <= 'Z');
}

inline bool isLowerAplhaASCII(char c)
{
return (c >= 'a' && c <= 'z');
}

inline bool isAlphaASCII(char c)
{
return (c >= 'a' && c <= 'z')
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ namespace DB
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
F(type_full, {"type", "full"}), F(type_failed, {"type", "failed"})) \
F(type_full, {"type", "full"}), F(type_failed, {"type", "failed"}), \
F(type_drop_keyspace, {"type", "drop_keyspace"})) \
M(tiflash_schema_trigger_count, "Total number of each kinds of schema sync trigger", Counter, /**/ \
F(type_timer, {"type", "timer"}), F(type_raft_decode, {"type", "raft_decode"}), F(type_cop_read, {"type", "cop_read"})) \
M(tiflash_schema_internal_ddl_count, "Total number of each kinds of internal ddl operations", Counter, \
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Common/UniThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
}


template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
template class ThreadFromGlobalPoolImpl<true>;
Expand All @@ -355,6 +354,11 @@ void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, s
the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/));
}

void GlobalThreadPool::registerFinalizer(std::function<void()> fn)
{
finalize_fns.push_back(fn);
}

GlobalThreadPool & GlobalThreadPool::instance()
{
if (!the_instance)
Expand All @@ -369,9 +373,8 @@ GlobalThreadPool & GlobalThreadPool::instance()

GlobalThreadPool::~GlobalThreadPool() noexcept
{
// We must make sure IOThread is released before GlobalThreadPool runs
// `finalize`. Or the threads in GlobalThreadPool never ends.
IOThreadPool::shutdown();
for (auto & fn : finalize_fns)
fn();
}

} // namespace DB
52 changes: 29 additions & 23 deletions dbms/src/Common/UniThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,14 @@ class GlobalThreadPool : public FreeThreadPool
: FreeThreadPool(max_threads_, max_free_threads_, queue_size_, shutdown_on_exception_)
{}

std::vector<std::function<void()>> finalize_fns;

public:
static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000);
static GlobalThreadPool & instance();

void registerFinalizer(std::function<void()>);

~GlobalThreadPool() noexcept;
};

Expand All @@ -187,29 +192,30 @@ class ThreadFromGlobalPoolImpl : boost::noncopyable
/// NOTE:
/// - If this will throw an exception, the destructor won't be called
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
GlobalThreadPool::instance().scheduleOrThrow([state = state,
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
SCOPE_EXIT(
state->thread_id = std::thread::id();
state->event.set(););

state->thread_id = std::this_thread::get_id();

/// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
auto function = std::move(func);
auto arguments = std::move(args);

/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
// DB::ThreadStatus thread_status;
std::apply(function, arguments);
},
0, // default priority
0, // default wait_microseconds
propagate_opentelemetry_context);
GlobalThreadPool::instance().scheduleOrThrow(
[state = state,
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
SCOPE_EXIT(
state->thread_id = std::thread::id();
state->event.set(););

state->thread_id = std::this_thread::get_id();

/// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
auto function = std::move(func);
auto arguments = std::move(args);

/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
// DB::ThreadStatus thread_status;
std::apply(function, arguments);
},
0, // default priority
0, // default wait_microseconds
propagate_opentelemetry_context);
}

ThreadFromGlobalPoolImpl(ThreadFromGlobalPoolImpl && rhs) noexcept
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Core/TiFlashDisaggregatedMode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ DisaggregatedMode getDisaggregatedMode(const Poco::Util::LayeredConfiguration &
if (config.has(config_key))
{
std::string mode_str = config.getString(config_key);
RUNTIME_ASSERT(mode_str == DISAGGREGATED_MODE_WRITE || mode_str == DISAGGREGATED_MODE_COMPUTE,
RUNTIME_ASSERT(mode_str == DISAGGREGATED_MODE_WRITE
|| mode_str == DISAGGREGATED_MODE_STORAGE // backward compatibility
|| mode_str == DISAGGREGATED_MODE_COMPUTE,
"Expect disaggregated_mode is {} or {}, got: {}",
DISAGGREGATED_MODE_WRITE,
DISAGGREGATED_MODE_COMPUTE,
Expand All @@ -45,7 +47,7 @@ DisaggregatedMode getDisaggregatedMode(const Poco::Util::LayeredConfiguration &
bool useAutoScaler(const Poco::Util::LayeredConfiguration & config)
{
static const std::string autoscaler_config_key = "flash.use_autoscaler";
bool use_autoscaler = true;
bool use_autoscaler = false;
if (config.has(autoscaler_config_key))
use_autoscaler = config.getBool(autoscaler_config_key);
return use_autoscaler;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/TiFlashDisaggregatedMode.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
// Note that TiFlash Write Node is also named as TiFlash Storage Node in many places.
// To make sure it is consistent to our documents, we better stick with "tiflash_write" in the configurations.
#define DISAGGREGATED_MODE_WRITE "tiflash_write"
#define DISAGGREGATED_MODE_STORAGE "tiflash_storage" // backward compatibility just for parsing config
#define DISAGGREGATED_MODE_COMPUTE "tiflash_compute"
// engine_role determine whether TiFlash use S3 to write.
#define DISAGGREGATED_MODE_WRITE_ENGINE_ROLE "write"
Expand Down
33 changes: 33 additions & 0 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,39 @@ MockProxyRegion::MockProxyRegion(uint64_t id_)
state.mutable_region()->set_id(id);
}

UniversalWriteBatch MockProxyRegion::persistMeta()
{
auto _ = genLockGuard();
auto wb = UniversalWriteBatch();

auto region_key = UniversalPageIdFormat::toRegionLocalStateKeyInKVEngine(this->id);
auto region_local_state = this->state.SerializeAsString();
MemoryWriteBuffer buf(0, region_local_state.size());
buf.write(region_local_state.data(), region_local_state.size());
wb.putPage(UniversalPageId(region_key.data(), region_key.size()), 0, buf.tryGetReadBuffer(), region_local_state.size());

auto apply_key = UniversalPageIdFormat::toRaftApplyStateKeyInKVEngine(this->id);
auto raft_apply_state = this->apply.SerializeAsString();
MemoryWriteBuffer buf2(0, raft_apply_state.size());
buf2.write(raft_apply_state.data(), raft_apply_state.size());
wb.putPage(UniversalPageId(apply_key.data(), apply_key.size()), 0, buf2.tryGetReadBuffer(), raft_apply_state.size());

raft_serverpb::RegionLocalState restored_region_state;
raft_serverpb::RaftApplyState restored_apply_state;
restored_region_state.ParseFromArray(region_local_state.data(), region_local_state.size());
restored_apply_state.ParseFromArray(raft_apply_state.data(), raft_apply_state.size());
return wb;
}

void MockProxyRegion::addPeer(uint64_t store_id, uint64_t peer_id, metapb::PeerRole role)
{
auto _ = genLockGuard();
auto & peer = *state.mutable_region()->mutable_peers()->Add();
peer.set_store_id(store_id);
peer.set_id(peer_id);
peer.set_role(role);
}

std::optional<kvrpcpb::ReadIndexResponse> RawMockReadIndexTask::poll(std::shared_ptr<MockAsyncNotifier> waker)
{
auto _ = genLockGuard();
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Storages/Page/V3/Universal/UniversalWriteBatchImpl.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ReadIndexWorker.h>
#include <kvproto/raft_serverpb.pb.h>
Expand All @@ -36,6 +37,9 @@ struct MockProxyRegion : MutexLockWrap
void updateCommitIndex(uint64_t index);
void setSate(raft_serverpb::RegionLocalState);
explicit MockProxyRegion(uint64_t id);
UniversalWriteBatch persistMeta();
void addPeer(uint64_t store_id, uint64_t peer_id, metapb::PeerRole role);

struct RawWrite
{
std::vector<std::string> keys;
Expand Down
17 changes: 9 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@

namespace DB
{
namespace ErrorCodes
{
extern const int REGION_EPOCH_NOT_MATCH;
} // namespace ErrorCodes

namespace FailPoints
{
Expand Down Expand Up @@ -305,7 +301,13 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
// and ask RN to send requests again with correct region info. When RN updates region info,
// RN may be sending requests to other WN.

throw Exception("Rejected disaggregated DAG execute because RN region info does not match", DB::ErrorCodes::REGION_EPOCH_NOT_MATCH);
RegionException::UnavailableRegions region_ids;
for (const auto & info : context.getDAGContext()->retry_regions)
region_ids.insert(info.region_id);

throw RegionException(
std::move(region_ids),
RegionException::RegionReadStatus::EPOCH_NOT_MATCH);
}

// A failpoint to test pause before alter lock released
Expand Down Expand Up @@ -800,9 +802,8 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, std::move(disaggregated_snap), expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "disaggregated task has been registered {}", snap_id);
LOG_INFO(log, "task snapshot registered, snapshot_id={}", snap_id);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}

if (has_multiple_partitions)
Expand Down
Loading

0 comments on commit a77301a

Please sign in to comment.