Skip to content

Commit

Permalink
Improve behavior after exception in begin/end stream lumi
Browse files Browse the repository at this point in the history
  • Loading branch information
wddgit committed Apr 4, 2024
1 parent a471799 commit 8bc74f7
Show file tree
Hide file tree
Showing 14 changed files with 773 additions and 69 deletions.
75 changes: 40 additions & 35 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
#include <sstream>
#include <atomic>
#include <unordered_set>
#include <utility>

namespace edm {

Expand Down Expand Up @@ -397,47 +398,51 @@ namespace edm {
typename T::TransitionInfoType& transitionInfo,
ServiceToken const& token,
bool cleaningUpAfterException) {
auto group = iHolder.group();
auto const& principal = transitionInfo.principal();
T::setStreamContext(streamContext_, principal);

auto id = principal.id();
ServiceWeakToken weakToken = token;
auto doneTask = make_waiting_task(
[this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), id, cleaningUpAfterException, weakToken](
std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
}
iHolder.doneWaiting(excpt);
});
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
} catch (...) {
}
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
}
iHolder.doneWaiting(excpt);
});

auto task = make_functor_task(
[this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
auto task =
make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
auto token = weakToken.lock();
ServiceRegistry::Operate op(token);
// Caught exception is propagated via WaitingTaskHolder
Expand Down Expand Up @@ -465,7 +470,7 @@ namespace edm {
//Enqueueing will start another thread if there is only
// one thread in the job. Having stream == 0 use spawn
// avoids starting up another thread when there is only one stream.
iHolder.group()->run([task]() {
group->run([task]() {
TaskSentry s{task};
task->execute();
});
Expand Down
6 changes: 4 additions & 2 deletions FWCore/Framework/interface/WorkerInPath.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"

#include <utility>

namespace edm {

class PathContext;
Expand Down Expand Up @@ -116,7 +118,7 @@ namespace edm {

if constexpr (T::isEvent_) {
ParentContext parentContext(&placeInPathContext_);
worker_->doWorkAsync<T>(iTask, info, token, streamID, parentContext, context);
worker_->doWorkAsync<T>(std::move(iTask), info, token, streamID, parentContext, context);
} else {
ParentContext parentContext(context);

Expand All @@ -125,7 +127,7 @@ namespace edm {
// into the runs or lumis in stream transitions, so there can be
// no data dependencies which require prefetching. Prefetching is
// needed for global transitions, but they are run elsewhere.
worker_->doWorkNoPrefetchingAsync<T>(iTask, info, token, streamID, parentContext, context);
worker_->doWorkNoPrefetchingAsync<T>(std::move(iTask), info, token, streamID, parentContext, context);
}
}
} // namespace edm
Expand Down
106 changes: 88 additions & 18 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ namespace edm {
bool ranAcquireWithoutException_;
bool moduleValid_ = true;
bool shouldTryToContinue_ = false;
bool beginSucceeded_ = false;
};

namespace {
Expand All @@ -633,14 +634,22 @@ namespace edm {
ModuleSignalSentry(ActivityRegistry* a,
typename T::Context const* context,
ModuleCallingContext const* moduleCallingContext)
: a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
if (a_)
T::preModuleSignal(a_, context, moduleCallingContext_);
}
: a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}

~ModuleSignalSentry() {
if (a_)
T::postModuleSignal(a_, context_, moduleCallingContext_);
CMS_SA_ALLOW try {
if (a_) {
T::postModuleSignal(a_, context_, moduleCallingContext_);
}
} catch (...) {
}
}
void postModuleSignal() {
if (a_) {
auto temp = a_;
a_ = nullptr;
T::postModuleSignal(temp, context_, moduleCallingContext_);
}
}

private:
Expand Down Expand Up @@ -690,7 +699,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBegin(info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoBegin(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -715,7 +729,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamBegin(id, info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -740,7 +759,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEnd(info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -765,7 +789,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamEnd(id, info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -791,7 +820,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBegin(info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoBegin(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -816,7 +850,13 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamBegin(id, info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
cpp.postModuleSignal();
iWorker->beginSucceeded_ = true;
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -842,7 +882,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEnd(info, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -866,8 +911,18 @@ namespace edm {
ActivityRegistry* actReg,
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamEnd(id, info, mcc);
if (iWorker->beginSucceeded_) {
iWorker->beginSucceeded_ = false;

ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
return true;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -892,7 +947,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand All @@ -912,7 +972,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand All @@ -932,7 +997,12 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEndProcessBlock(info.principal(), mcc);
if (actReg) {
Arg::preModuleSignal(actReg, context, mcc);
}
auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand Down
3 changes: 0 additions & 3 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1773,9 +1773,6 @@ namespace edm {
streamQueues_[i].pause();

auto& event = principalCache_.eventPrincipal(i);
//We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
// held by the container as this lambda may not finish executing before all the tasks it
// spawns have already started to run.
auto eventSetupImpls = &status->eventSetupImpls();
auto lp = status->lumiPrincipal().get();
streamLumiStatus_[i] = std::move(status);
Expand Down
Loading

0 comments on commit 8bc74f7

Please sign in to comment.