Skip to content

Commit

Permalink
resolve the life-cycle race between a slotImpl and callbacks pointing…
Browse files Browse the repository at this point in the history
… to it.

Signed-off-by: Xin Zhuang <[email protected]>
  • Loading branch information
stevenzzzz committed Sep 3, 2019
1 parent b28edca commit a896a66
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 14 deletions.
10 changes: 10 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ class Slot {
*/
using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>;
virtual void set(InitializeCb cb) PURE;

// UpdateCb takes the current stored data, and returns an updated/new version data.
// TLS will run the callback and replace the stored data with the returned value *in each thread*.
// NOTE: The update callback is not supposed to capture the Slot, or its owner. As the owner may
// be destructed in main thread before the update_cb gets called in a worker thread.
//
using UpdateCb = std::function<ThreadLocalObjectSharedPtr(ThreadLocalObjectSharedPtr)>;
virtual void runOnAllThreads(const UpdateCb&& update_cb) PURE;

virtual void runOnAllThreads(const UpdateCb&& update_cb, Event::PostCb complete_cb) PURE;
};

using SlotPtr = std::unique_ptr<Slot>;
Expand Down
14 changes: 14 additions & 0 deletions source/common/config/config_provider_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ ConfigSubscriptionCommonBase::~ConfigSubscriptionCommonBase() {
init_target_.ready();
config_provider_manager_.unbindSubscription(manager_identifier_);
}

void ConfigSubscriptionCommonBase::applyConfigUpdate(const ConfigUpdateCb& update_fn,
const Event::PostCb& complete_cb) {
tls_->runOnAllThreads(
[update_fn](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
auto prev_thread_local_config = std::dynamic_pointer_cast<ThreadLocalConfig>(previous);
ASSERT(prev_thread_local_config != nullptr);
prev_thread_local_config->config_ = update_fn(prev_thread_local_config->config_);
return previous;
},
complete_cb);
}

bool ConfigSubscriptionInstance::checkAndApplyConfigUpdate(const Protobuf::Message& config_proto,
const std::string& config_name,
const std::string& version_info) {
Expand Down
11 changes: 1 addition & 10 deletions source/common/config/config_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,7 @@ class ConfigSubscriptionCommonBase : protected Logger::Loggable<Logger::Id::conf
* @param complete_cb the callback to run when the update propagation is done.
*/
void applyConfigUpdate(
const ConfigUpdateCb& update_fn, const Event::PostCb& complete_cb = []() {}) {
tls_->runOnAllThreads(
[this, update_fn]() {
// NOTE: there is a known race condition between *this* subscription being teared down in
// main thread and the posted callback being executed before the destruction. See more
// details in https://github.com/envoyproxy/envoy/issues/7902
tls_->getTyped<ThreadLocalConfig>().config_ = update_fn(getConfig());
},
complete_cb);
}
const ConfigUpdateCb& update_fn, const Event::PostCb& complete_cb = []() {});

void setLastUpdated() { last_updated_ = time_source_.systemTime(); }

Expand Down
7 changes: 5 additions & 2 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() {
void RdsRouteConfigProviderImpl::onConfigUpdate() {
ConfigConstSharedPtr new_config(
new ConfigImpl(config_update_info_->routeConfiguration(), factory_context_, false));
tls_->runOnAllThreads(
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; });
tls_->runOnAllThreads([new_config](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
static_cast<ThreadLocalConfig*>(previous.get())->config_ = new_config;
return previous;
});
}

void RdsRouteConfigProviderImpl::validateConfig(
Expand Down
69 changes: 67 additions & 2 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ SlotPtr InstanceImpl::allocateSlot() {

if (free_slot_indexes_.empty()) {
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
slots_.push_back(slot.get());
return slot;
auto wrapper = std::make_unique<Bookkeeper>(*this, std::move(slot));
slots_.push_back(&wrapper->slot());
return wrapper;
}
const uint32_t idx = free_slot_indexes_.front();
free_slot_indexes_.pop_front();
Expand All @@ -41,12 +42,50 @@ SlotPtr InstanceImpl::allocateSlot() {
bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return thread_local_data_.data_.size() > index_;
}
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb&& cb) {
parent_.runOnAllThreads([this, cb = std::move(cb)]() { setThreadLocal(index_, cb(get())); });
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb&& cb, Event::PostCb complete_cb) {
parent_.runOnAllThreads([this, cb = std::move(cb)]() { setThreadLocal(index_, cb(get())); },
complete_cb);
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
ASSERT(currentThreadRegistered());
return thread_local_data_.data_[index_];
}

InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot)
: parent_(parent), holder_(std::make_unique<SlotHolder>(std::move(slot))) {}

ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot().get(); }
void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb&& cb, Event::PostCb complete_cb) {
slot().runOnAllThreads([cb = std::move(cb), ref_count = holder_->ref_count_](
ThreadLocalObjectSharedPtr previous) { return cb(previous); },
complete_cb);
}
void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb&& cb) {
slot().runOnAllThreads([cb = std::move(cb), ref_count = holder_->ref_count_](
ThreadLocalObjectSharedPtr previous) { return cb(previous); });
}
bool InstanceImpl::Bookkeeper::currentThreadRegistered() {
return slot().currentThreadRegistered();
}
void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, ref_count = holder_->ref_count_]() { cb(); });
}
void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, main_callback, ref_count = holder_->ref_count_]() { cb(); },
main_callback);
}
void InstanceImpl::Bookkeeper::set(InitializeCb cb) {
slot().set([cb, ref_count = holder_->ref_count_](Event::Dispatcher& dispatcher)
-> ThreadLocalObjectSharedPtr { return cb(dispatcher); });
}

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
Expand All @@ -60,6 +99,32 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
void InstanceImpl::recycle(std::unique_ptr<SlotHolder>&& holder) {
if (holder->isRecycleable()) {
holder.reset();
return;
}
deferred_deletes_.emplace_back(std::move(holder));
scheduleCleanup();
}

void InstanceImpl::scheduleCleanup() {
ASSERT(main_thread_dispatcher_ != nullptr);
if (shutdown_) {
return;
}
if (deferred_deletes_.empty()) {
return;
}
main_thread_dispatcher_->post([this]() {
deferred_deletes_.remove_if(
[](std::unique_ptr<SlotHolder>& holder) -> bool { return holder->isRecycleable(); });
if (!deferred_deletes_.empty()) {
// Recursively.
scheduleCleanup();
}
});
}

void InstanceImpl::removeSlot(SlotImpl& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
Expand Down
46 changes: 46 additions & 0 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
SlotImpl(InstanceImpl& parent, uint64_t index) : parent_(parent), index_(index) {}
~SlotImpl() override { parent_.removeSlot(*this); }

// non copyable, non moveable.
SlotImpl(const SlotImpl&) = delete;
SlotImpl(SlotImpl&&) = delete;
SlotImpl& operator=(const SlotImpl&) = delete;
SlotImpl& operator=(SlotImpl&&) = delete;

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
bool currentThreadRegistered() override;
void runOnAllThreads(const UpdateCb&& cb) override;
void runOnAllThreads(const UpdateCb&& cb, Event::PostCb complete_cb) override;
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
Expand All @@ -45,17 +53,55 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const uint64_t index_;
};

// A helper class for holding a SlotImpl and its bookkeeping shared_ptr which counts the number of
// update callbacks on-the-fly.
struct SlotHolder {
SlotHolder(std::unique_ptr<SlotImpl>&& slot) : slot_(std::move(slot)) {}
bool isRecycleable() { return ref_count_.use_count() == 1; }

std::unique_ptr<SlotImpl> slot_;
std::shared_ptr<int> ref_count_{new int(0)};
};

// A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue
// (detaches it).
struct Bookkeeper : public Slot {
Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot);
~Bookkeeper() { parent_.recycle(std::move(holder_)); }
SlotImpl& slot() { return *(holder_->slot_); }

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
void runOnAllThreads(const UpdateCb&& cb) override;
void runOnAllThreads(const UpdateCb&& cb, Event::PostCb complete_cb) override;
bool currentThreadRegistered() override;
void runOnAllThreads(Event::PostCb cb) override;
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override;
void set(InitializeCb cb) override;

InstanceImpl& parent_;
std::unique_ptr<SlotHolder> holder_;
};

struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};

void recycle(std::unique_ptr<SlotHolder>&& holder);
// Cleanup the deferred deletes queue.
void scheduleCleanup();
void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;

// A queue for Slots that has to be deferred to delete due to out-going callbacks
// pointing to the Slot.
std::list<std::unique_ptr<SlotHolder>> deferred_deletes_;

std::vector<SlotImpl*> slots_;
// A list of index of freed slots.
std::list<uint32_t> free_slot_indexes_;
Expand Down
33 changes: 33 additions & 0 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,39 @@ TEST_F(ThreadLocalInstanceImplTest, All) {
tls_.shutdownThread();
}

TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) {
InSequence s;

SlotPtr slot = tls_.allocateSlot();

auto newer_version = std::make_shared<TestThreadLocalObject>();
bool update_called = false;

TestThreadLocalObject& object_ref = setObject(*slot);
auto update_cb = [&object_ref, &update_called,
newer_version](ThreadLocalObjectSharedPtr obj) -> ThreadLocalObjectSharedPtr {
// The unit test setup have two dispatchers registered, but only one thread, this lambda will be
// called twice in the same thread.
if (!update_called) {
EXPECT_EQ(obj.get(), &object_ref);
update_called = true;
} else {
EXPECT_EQ(obj.get(), newer_version.get());
}

return newer_version;
};
EXPECT_CALL(thread_dispatcher_, post(_));
EXPECT_CALL(object_ref, onDestroy());
EXPECT_CALL(*newer_version, onDestroy());
slot->runOnAllThreads(update_cb);

EXPECT_EQ(newer_version.get(), &slot->getTyped<TestThreadLocalObject>());

tls_.shutdownGlobalThreading();
tls_.shutdownThread();
}

// TODO(ramaraochavali): Run this test with real threads. The current issue in the unit
// testing environment is, the post to main_dispatcher is not working as expected.

Expand Down
10 changes: 10 additions & 0 deletions test/mocks/thread_local/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class MockInstance : public Instance {
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
void runOnAllThreads(const UpdateCb&& cb) override {
parent_.runOnAllThreads(
[cb = std::move(cb), this]() { parent_.data_[index_] = cb(parent_.data_[index_]); });
}
void runOnAllThreads(const UpdateCb&& cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(
[cb = std::move(cb), this]() { parent_.data_[index_] = cb(parent_.data_[index_]); },
main_callback);
}

void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); }

MockInstance& parent_;
Expand Down

0 comments on commit a896a66

Please sign in to comment.