From 80853bc053eab14e465b7e2871a949894e2aef2f Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Tue, 2 Jul 2013 15:23:21 -0500 Subject: [PATCH 1/4] Allow SubProcess to also create an instance since in the future SubProcess will manage multiple streams just like EventProcessor. --- FWCore/Utilities/interface/StreamID.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/FWCore/Utilities/interface/StreamID.h b/FWCore/Utilities/interface/StreamID.h index b36c5a9f94c93..a84c429385306 100644 --- a/FWCore/Utilities/interface/StreamID.h +++ b/FWCore/Utilities/interface/StreamID.h @@ -26,6 +26,7 @@ // forward declarations namespace edm { class EventProcessor; + class SubProcess; class StreamID { @@ -50,8 +51,9 @@ namespace edm { } private: - ///Only a Schedule is allowed to create one of these + ///Only a Schedule or a SubProcess is allowed to create one of these friend class EventProcessor; + friend class SubProcess; explicit StreamID(unsigned int iValue) : value_(iValue) {} StreamID() = delete; From cd17f2bdb71336b62185708e51f8b7f1186b189b Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Tue, 2 Jul 2013 15:25:31 -0500 Subject: [PATCH 2/4] Propagated StreamID throughout the APIs --- FWCore/Framework/interface/Schedule.h | 37 +++++++------------ FWCore/Framework/interface/ScheduleItems.h | 4 +- .../interface/UnscheduledCallProducer.h | 6 +-- FWCore/Framework/src/EventProcessor.cc | 2 +- FWCore/Framework/src/Path.h | 7 ++-- FWCore/Framework/src/Schedule.cc | 4 +- FWCore/Framework/src/ScheduleItems.cc | 6 ++- FWCore/Framework/src/SubProcess.cc | 2 +- FWCore/Framework/src/Worker.h | 10 +++-- FWCore/Framework/src/WorkerInPath.h | 7 ++-- 10 files changed, 43 insertions(+), 42 deletions(-) diff --git a/FWCore/Framework/interface/Schedule.h b/FWCore/Framework/interface/Schedule.h index 9f6da6c308658..9112607d532f6 100644 --- a/FWCore/Framework/interface/Schedule.h +++ b/FWCore/Framework/interface/Schedule.h @@ -78,6 +78,7 @@ #include "FWCore/Utilities/interface/BranchType.h" #include "FWCore/Utilities/interface/ConvertException.h" #include "FWCore/Utilities/interface/Exception.h" +#include "FWCore/Utilities/interface/StreamID.h" #include "boost/shared_ptr.hpp" @@ -121,7 +122,8 @@ namespace edm { ActionTable const& actions, boost::shared_ptr areg, boost::shared_ptr processConfiguration, - const ParameterSet* subProcPSet); + const ParameterSet* subProcPSet, + StreamID streamID); enum State { Ready = 0, Running, Latched }; @@ -329,29 +331,12 @@ namespace edm { RunStopwatch::StopwatchPointer stopwatch_; boost::shared_ptr unscheduled_; + + StreamID streamID_; volatile bool endpathsAreActive_; }; - // ----------------------------- - // ProcessOneOccurrence is a functor that has bound a specific - // Principal and Event Setup, and can be called with a Path, to - // execute Path::processOneOccurrence for that event - - template - class ProcessOneOccurrence { - public: - typedef void result_type; - ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) : - ep(principal), es(setup) {}; - - void operator()(Path& p) {p.processOneOccurrence(ep, es);} - - private: - typename T::MyPrincipal& ep; - EventSetup const& es; - }; - void inline Schedule::reportSkipped(EventPrincipal const& ep) const { @@ -381,7 +366,7 @@ namespace edm { try { try { //make sure the unscheduled items see this transition [Event will be a no-op] - unscheduled_->runNow(ep, es); + unscheduled_->runNow(ep, es, streamID_); if (runTriggerPaths(ep, es)) { if (T::isEvent_) ++total_passed_; } @@ -400,7 +385,7 @@ namespace edm { try { CPUTimer timer; - if (results_inserter_.get()) results_inserter_->doWork(ep, es, nullptr, &timer); + if (results_inserter_.get()) results_inserter_->doWork(ep, es, nullptr, &timer,streamID_); } catch (cms::Exception & ex) { if (T::isEvent_) { @@ -438,7 +423,9 @@ namespace edm { template bool Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) { - for_all(trig_paths_, ProcessOneOccurrence(ep, es)); + for(auto& p : trig_paths_) { + p.processOneOccurrence(ep, es, streamID_); + } return results_->accept(); } @@ -447,7 +434,9 @@ namespace edm { Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) { // Note there is no state-checking safety controlling the // activation/deactivation of endpaths. - for_all(end_paths_, ProcessOneOccurrence(ep, es)); + for(auto& p : end_paths_) { + p.processOneOccurrence(ep, es, streamID_); + } // We could get rid of the functor ProcessOneOccurrence if we used // boost::lambda, but the use of lambda with member functions diff --git a/FWCore/Framework/interface/ScheduleItems.h b/FWCore/Framework/interface/ScheduleItems.h index d03aa89d29b59..83b9da25761b0 100644 --- a/FWCore/Framework/interface/ScheduleItems.h +++ b/FWCore/Framework/interface/ScheduleItems.h @@ -20,6 +20,7 @@ namespace edm { class ProductRegistry; class Schedule; class SignallingProductRegistry; + class StreamID; struct ScheduleItems { ScheduleItems(); @@ -44,7 +45,8 @@ namespace edm { std::auto_ptr initSchedule(ParameterSet& parameterSet, - ParameterSet const* subProcessPSet); + ParameterSet const* subProcessPSet, + StreamID streamID); void clear(); diff --git a/FWCore/Framework/interface/UnscheduledCallProducer.h b/FWCore/Framework/interface/UnscheduledCallProducer.h index 11835700cbe93..eacaa596a9bff 100644 --- a/FWCore/Framework/interface/UnscheduledCallProducer.h +++ b/FWCore/Framework/interface/UnscheduledCallProducer.h @@ -22,7 +22,7 @@ namespace edm { } template - void runNow(typename T::MyPrincipal& p, EventSetup const& es) { + void runNow(typename T::MyPrincipal& p, EventSetup const& es, StreamID streamID) { //do nothing for event since we will run when requested if(!T::isEvent_) { for(std::map::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end(); @@ -30,7 +30,7 @@ namespace edm { ++it) { CPUTimer timer; try { - it->second->doWork(p, es, nullptr, &timer); + it->second->doWork(p, es, nullptr, &timer,streamID); } catch (cms::Exception & ex) { std::ostringstream ost; @@ -75,7 +75,7 @@ namespace edm { if(itFound != labelToWorkers_.end()) { CPUTimer timer; try { - itFound->second->doWork >(event, eventSetup, iContext, &timer); + itFound->second->doWork >(event, eventSetup, iContext, &timer,event.streamID()); } catch (cms::Exception & ex) { std::ostringstream ost; diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index da20cf43189b4..0ee754f4c4973 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -646,7 +646,7 @@ namespace edm { input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_); // intialize the Schedule - schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get()); + schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),StreamID{0}); // set the data members act_table_ = std::move(items.act_table_); diff --git a/FWCore/Framework/src/Path.h b/FWCore/Framework/src/Path.h index a7b56dbf9fa4c..f82ab63c2934f 100644 --- a/FWCore/Framework/src/Path.h +++ b/FWCore/Framework/src/Path.h @@ -35,6 +35,7 @@ namespace edm { class RunPrincipal; class LuminosityBlockPrincipal; class EarlyDeleteHelper; + class StreamID; class Path { public: @@ -52,7 +53,7 @@ namespace edm { bool isEndPath); template - void processOneOccurrence(typename T::MyPrincipal&, EventSetup const&); + void processOneOccurrence(typename T::MyPrincipal&, EventSetup const&, StreamID const&); int bitPosition() const { return bitpos_; } std::string const& name() const { return name_; } @@ -154,7 +155,7 @@ namespace edm { } template - void Path::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) { + void Path::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es, StreamID const& streamID) { //Create the PathSignalSentry before the RunStopwatch so that // we only record the time spent in the path not from the signal @@ -184,7 +185,7 @@ namespace edm { try { try { cpc.activate(idx, i->getWorker()->descPtr()); - should_continue = i->runWorker(ep, es, &cpc); + should_continue = i->runWorker(ep, es, &cpc, streamID); } catch (cms::Exception& e) { throw; } catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); } diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 21553df267e74..e76df6911cf5f 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -285,7 +285,8 @@ namespace edm { ActionTable const& actions, boost::shared_ptr areg, boost::shared_ptr processConfiguration, - const ParameterSet* subProcPSet) : + const ParameterSet* subProcPSet, + StreamID streamID) : worker_reg_(areg), act_table_(&actions), actReg_(areg), @@ -304,6 +305,7 @@ namespace edm { total_passed_(), stopwatch_(wantSummary_? new RunStopwatch::StopwatchPointer::element_type : static_cast (nullptr)), unscheduled_(new UnscheduledCallProducer), + streamID_(streamID), endpathsAreActive_(true) { ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet()); diff --git a/FWCore/Framework/src/ScheduleItems.cc b/FWCore/Framework/src/ScheduleItems.cc index 64293f4180128..2419717858e29 100644 --- a/FWCore/Framework/src/ScheduleItems.cc +++ b/FWCore/Framework/src/ScheduleItems.cc @@ -125,7 +125,8 @@ namespace edm { std::auto_ptr ScheduleItems::initSchedule(ParameterSet& parameterSet, - ParameterSet const* subProcessPSet) { + ParameterSet const* subProcessPSet, + StreamID streamID) { std::auto_ptr schedule( new Schedule(parameterSet, ServiceRegistry::instance().get(), @@ -134,7 +135,8 @@ namespace edm { *act_table_, actReg_, processConfiguration_, - subProcessPSet)); + subProcessPSet, + streamID)); return schedule; } diff --git a/FWCore/Framework/src/SubProcess.cc b/FWCore/Framework/src/SubProcess.cc index ce40945529292..8c287c73cb3ac 100644 --- a/FWCore/Framework/src/SubProcess.cc +++ b/FWCore/Framework/src/SubProcess.cc @@ -108,7 +108,7 @@ namespace edm { esp_ = esController.makeProvider(*processParameterSet_); // intialize the Schedule - schedule_ = items.initSchedule(*processParameterSet_,subProcessParameterSet.get()); + schedule_ = items.initSchedule(*processParameterSet_,subProcessParameterSet.get(),StreamID{0}); // set the items act_table_ = std::move(items.act_table_); diff --git a/FWCore/Framework/src/Worker.h b/FWCore/Framework/src/Worker.h index 7f0700b28f953..4c0f68660d9f5 100644 --- a/FWCore/Framework/src/Worker.h +++ b/FWCore/Framework/src/Worker.h @@ -31,6 +31,7 @@ the worker is reset(). #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/ConvertException.h" #include "FWCore/Utilities/interface/BranchType.h" +#include "FWCore/Utilities/interface/StreamID.h" #include "boost/shared_ptr.hpp" @@ -43,6 +44,7 @@ namespace edm { class EventPrincipal; class EarlyDeleteHelper; class ProductHolderIndexHelper; + class StreamID; class Worker { public: @@ -57,8 +59,9 @@ namespace edm { template bool doWork(typename T::MyPrincipal&, EventSetup const& c, - CurrentProcessingContext const* cpc, - CPUTimer *const timer); + CurrentProcessingContext const* cpc, + CPUTimer *const timer, + StreamID stream); void beginJob() ; void endJob(); void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);} @@ -215,7 +218,8 @@ namespace edm { bool Worker::doWork(typename T::MyPrincipal& ep, EventSetup const& es, CurrentProcessingContext const* cpc, - CPUTimer* const iTimer) { + CPUTimer* const iTimer, + StreamID streamID) { // A RunStopwatch, but only if we are processing an event. RunDualStopwatches stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer(), diff --git a/FWCore/Framework/src/WorkerInPath.h b/FWCore/Framework/src/WorkerInPath.h index d94d3ff79627a..8ec77ad499100 100644 --- a/FWCore/Framework/src/WorkerInPath.h +++ b/FWCore/Framework/src/WorkerInPath.h @@ -16,6 +16,7 @@ #include "FWCore/Framework/src/RunStopwatch.h" namespace edm { + class StreamID; class WorkerInPath { public: @@ -26,7 +27,7 @@ namespace edm { template bool runWorker(typename T::MyPrincipal&, EventSetup const&, - CurrentProcessingContext const* cpc); + CurrentProcessingContext const* cpc, StreamID streamID); std::pair timeCpuReal() const { if(stopwatch_) { @@ -62,7 +63,7 @@ namespace edm { template bool WorkerInPath::runWorker(typename T::MyPrincipal & ep, EventSetup const & es, - CurrentProcessingContext const* cpc) { + CurrentProcessingContext const* cpc, StreamID streamID) { if (T::isEvent_) { ++timesVisited_; @@ -73,7 +74,7 @@ namespace edm { // may want to change the return value from the worker to be // the Worker::FilterAction so conditions in the path will be easier to // identify - rc = worker_->doWork(ep, es, cpc,stopwatch_.get()); + rc = worker_->doWork(ep, es, cpc,stopwatch_.get(),streamID); // Ignore return code for non-event (e.g. run, lumi) calls if (!T::isEvent_) rc = true; From 03d880c1a64ad799b25df24518e3411203c8c8f1 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 5 Jul 2013 09:11:55 -0500 Subject: [PATCH 3/4] Added stream transitions In addition to adding stream transitions we renamed to old transitions to be 'global'. For the worker code I use an adapter to choose the appropriate virtual function to call rather than the old way of using function overloading. The code compiles and all tests work however the stream transitions are not yet communicated to modules. --- FWCore/Framework/interface/BranchActionType.h | 6 +- FWCore/Framework/interface/OccurrenceTraits.h | 144 ++++++++++++++++-- .../interface/UnscheduledCallProducer.h | 2 +- FWCore/Framework/src/EventProcessor.cc | 58 ++++++- FWCore/Framework/src/SubProcess.cc | 10 +- FWCore/Framework/src/Worker.h | 123 ++++++++++++--- FWCore/Framework/src/WorkerT.cc | 35 ++++- FWCore/Framework/src/WorkerT.h | 18 ++- 8 files changed, 337 insertions(+), 59 deletions(-) diff --git a/FWCore/Framework/interface/BranchActionType.h b/FWCore/Framework/interface/BranchActionType.h index 0cfe5eeef4acc..a7e8eb0e04d44 100644 --- a/FWCore/Framework/interface/BranchActionType.h +++ b/FWCore/Framework/interface/BranchActionType.h @@ -10,8 +10,10 @@ BranchActionType: BranchAction namespace edm { enum BranchActionType { - BranchActionBegin = 0, - BranchActionEnd = 1 + BranchActionGlobalBegin = 0, + BranchActionStreamBegin = 1, + BranchActionStreamEnd = 2, + BranchActionGlobalEnd = 3 }; } #endif diff --git a/FWCore/Framework/interface/OccurrenceTraits.h b/FWCore/Framework/interface/OccurrenceTraits.h index 5524a64ca343a..0a01b9f7b9e59 100644 --- a/FWCore/Framework/interface/OccurrenceTraits.h +++ b/FWCore/Framework/interface/OccurrenceTraits.h @@ -23,7 +23,7 @@ namespace edm { template class OccurrenceTraits; template <> - class OccurrenceTraits { + class OccurrenceTraits { public: typedef EventPrincipal MyPrincipal; static BranchType const branchType_ = InEvent; @@ -51,7 +51,7 @@ namespace edm { }; template <> - class OccurrenceTraits { + class OccurrenceTraits { public: typedef RunPrincipal MyPrincipal; static BranchType const branchType_ = InRun; @@ -79,63 +79,175 @@ namespace edm { }; template <> - class OccurrenceTraits { + class OccurrenceTraits { + public: + typedef RunPrincipal MyPrincipal; + static BranchType const branchType_ = InRun; + static bool const begin_ = true; + static bool const isEvent_ = false; + static void preScheduleSignal(ActivityRegistry *a, RunPrincipal const* ep) { + //a->preBeginRunSignal_(ep->id(), ep->beginTime()); + } + static void postScheduleSignal(ActivityRegistry *a, RunPrincipal* ep, EventSetup const* es) { + Run run(*ep, ModuleDescription()); + //a->postBeginRunSignal_(run, *es); + } + static void prePathSignal(ActivityRegistry *a, std::string const& s) { + //a->prePathBeginRunSignal_(s); + } + static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { + //a->postPathBeginRunSignal_(s, status); + } + static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->preModuleBeginRunSignal_(*md); + } + static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->postModuleBeginRunSignal_(*md); + } + }; + + template <> + class OccurrenceTraits { public: typedef RunPrincipal MyPrincipal; static BranchType const branchType_ = InRun; static bool const begin_ = false; static bool const isEvent_ = false; static void preScheduleSignal(ActivityRegistry *a, RunPrincipal const* ep) { - a->preEndRunSignal_(ep->id(), ep->endTime()); + //a->preEndRunSignal_(ep->id(), ep->endTime()); + } + static void postScheduleSignal(ActivityRegistry *a, RunPrincipal* ep, EventSetup const* es) { + //Run run(*ep, ModuleDescription()); + //a->postEndRunSignal_(run, *es); + } + static void prePathSignal(ActivityRegistry *a, std::string const& s) { + //a->prePathEndRunSignal_(s); + } + static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { + //a->postPathEndRunSignal_(s, status); + } + static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->preModuleEndRunSignal_(*md); + } + static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->postModuleEndRunSignal_(*md); + } + }; + + template <> + class OccurrenceTraits { + public: + typedef RunPrincipal MyPrincipal; + static BranchType const branchType_ = InRun; + static bool const begin_ = false; + static bool const isEvent_ = false; + static void preScheduleSignal(ActivityRegistry *a, RunPrincipal const* ep) { + a->preEndRunSignal_(ep->id(), ep->endTime()); } static void postScheduleSignal(ActivityRegistry *a, RunPrincipal* ep, EventSetup const* es) { Run run(*ep, ModuleDescription()); a->postEndRunSignal_(run, *es); } static void prePathSignal(ActivityRegistry *a, std::string const& s) { - a->prePathEndRunSignal_(s); + a->prePathEndRunSignal_(s); } static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { - a->postPathEndRunSignal_(s, status); + a->postPathEndRunSignal_(s, status); } static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { - a->preModuleEndRunSignal_(*md); + a->preModuleEndRunSignal_(*md); } static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { - a->postModuleEndRunSignal_(*md); + a->postModuleEndRunSignal_(*md); } }; - + template <> - class OccurrenceTraits { + class OccurrenceTraits { public: typedef LuminosityBlockPrincipal MyPrincipal; static BranchType const branchType_ = InLumi; static bool const begin_ = true; static bool const isEvent_ = false; static void preScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal const* ep) { - a->preBeginLumiSignal_(ep->id(), ep->beginTime()); + a->preBeginLumiSignal_(ep->id(), ep->beginTime()); } static void postScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal* ep, EventSetup const* es) { LuminosityBlock lumi(*ep, ModuleDescription()); a->postBeginLumiSignal_(lumi, *es); } static void prePathSignal(ActivityRegistry *a, std::string const& s) { - a->prePathBeginLumiSignal_(s); + a->prePathBeginLumiSignal_(s); + } + static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { + a->postPathBeginLumiSignal_(s, status); + } + static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + a->preModuleBeginLumiSignal_(*md); + } + static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + a->postModuleBeginLumiSignal_(*md); + } + }; + + template <> + class OccurrenceTraits { + public: + typedef LuminosityBlockPrincipal MyPrincipal; + static BranchType const branchType_ = InLumi; + static bool const begin_ = true; + static bool const isEvent_ = false; + static void preScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal const* ep) { + //a->preBeginLumiSignal_(ep->id(), ep->beginTime()); + } + static void postScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal* ep, EventSetup const* es) { + //LuminosityBlock lumi(*ep, ModuleDescription()); + //a->postBeginLumiSignal_(lumi, *es); + } + static void prePathSignal(ActivityRegistry *a, std::string const& s) { + //a->prePathBeginLumiSignal_(s); + } + static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { + //a->postPathBeginLumiSignal_(s, status); + } + static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->preModuleBeginLumiSignal_(*md); + } + static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { + //a->postModuleBeginLumiSignal_(*md); + } + }; + + template <> + class OccurrenceTraits { + public: + typedef LuminosityBlockPrincipal MyPrincipal; + static BranchType const branchType_ = InLumi; + static bool const begin_ = false; + static bool const isEvent_ = false; + static void preScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal const* ep) { + //a->preEndLumiSignal_(ep->id(), ep->beginTime()); + } + static void postScheduleSignal(ActivityRegistry *a, LuminosityBlockPrincipal* ep, EventSetup const* es) { + //LuminosityBlock lumi(*ep, ModuleDescription()); + //a->postEndLumiSignal_(lumi, *es); + } + static void prePathSignal(ActivityRegistry *a, std::string const& s) { + //a->prePathEndLumiSignal_(s); } static void postPathSignal(ActivityRegistry *a, std::string const& s, HLTPathStatus const& status) { - a->postPathBeginLumiSignal_(s, status); + //a->postPathEndLumiSignal_(s, status); } static void preModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { - a->preModuleBeginLumiSignal_(*md); + //a->preModuleEndLumiSignal_(*md); } static void postModuleSignal(ActivityRegistry *a, ModuleDescription const* md) { - a->postModuleBeginLumiSignal_(*md); + //a->postModuleEndLumiSignal_(*md); } }; template <> - class OccurrenceTraits { + class OccurrenceTraits { public: typedef LuminosityBlockPrincipal MyPrincipal; static BranchType const branchType_ = InLumi; diff --git a/FWCore/Framework/interface/UnscheduledCallProducer.h b/FWCore/Framework/interface/UnscheduledCallProducer.h index eacaa596a9bff..a14f25494d0cc 100644 --- a/FWCore/Framework/interface/UnscheduledCallProducer.h +++ b/FWCore/Framework/interface/UnscheduledCallProducer.h @@ -75,7 +75,7 @@ namespace edm { if(itFound != labelToWorkers_.end()) { CPUTimer timer; try { - itFound->second->doWork >(event, eventSetup, iContext, &timer,event.streamID()); + itFound->second->doWork >(event, eventSetup, iContext, &timer,event.streamID()); } catch (cms::Exception & ex) { std::ostringstream ost; diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 0ee754f4c4973..cdf05ad65f0aa 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1989,7 +1989,7 @@ namespace edm { looper_->doStartingNewLoop(); } { - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence(runPrincipal, es); if(hasSubProcess()) { @@ -2000,6 +2000,18 @@ namespace edm { if(looper_) { looper_->doBeginRun(runPrincipal, es); } + { + typedef OccurrenceTraits Traits; + ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); + schedule_->processOneOccurrence(runPrincipal, es); + if(hasSubProcess()) { + //subProcess_->doStreamBeginRun(StreamID{0}, runPrincipal, ts); + } + } + FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n"; + if(looper_) { + //looper_->doStreamBeginRun(StreamID{0},runPrincipal, es); + } } void EventProcessor::endRun(statemachine::Run const& run, bool cleaningUpAfterException) { @@ -2010,7 +2022,19 @@ namespace edm { espController_->eventSetupForInstance(ts); EventSetup const& es = esp_->eventSetup(); { - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; + ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); + schedule_->processOneOccurrence(runPrincipal, es, cleaningUpAfterException); + if(hasSubProcess()) { + //subProcess_->doStreamEndRun(runPrincipal, ts, cleaningUpAfterException); + } + } + FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n"; + if(looper_) { + //looper_->doStreamEndRun(runPrincipal, es); + } + { + typedef OccurrenceTraits Traits; ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence(runPrincipal, es, cleaningUpAfterException); if(hasSubProcess()) { @@ -2039,7 +2063,7 @@ namespace edm { espController_->eventSetupForInstance(ts); EventSetup const& es = esp_->eventSetup(); { - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence(lumiPrincipal, es); if(hasSubProcess()) { @@ -2050,6 +2074,18 @@ namespace edm { if(looper_) { looper_->doBeginLuminosityBlock(lumiPrincipal, es); } + { + typedef OccurrenceTraits Traits; + ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); + schedule_->processOneOccurrence(lumiPrincipal, es); + if(hasSubProcess()) { + //subProcess_->doStreamBeginLuminosityBlock(lumiPrincipal, ts); + } + } + FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n"; + if(looper_) { + //looper_->doStreamBeginLuminosityBlock(lumiPrincipal, es); + } } void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) { @@ -2062,7 +2098,19 @@ namespace edm { espController_->eventSetupForInstance(ts); EventSetup const& es = esp_->eventSetup(); { - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; + ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); + schedule_->processOneOccurrence(lumiPrincipal, es, cleaningUpAfterException); + if(hasSubProcess()) { + //subProcess_->doStreamEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); + } + } + FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n"; + if(looper_) { + //looper_->doStreamEndLuminosityBlock(lumiPrincipal, es); + } + { + typedef OccurrenceTraits Traits; ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence(lumiPrincipal, es, cleaningUpAfterException); if(hasSubProcess()) { @@ -2154,7 +2202,7 @@ namespace edm { espController_->eventSetupForInstance(ts); EventSetup const& es = esp_->eventSetup(); { - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; ScheduleSignalSentry sentry(actReg_.get(), pep, &es); schedule_->processOneOccurrence(*pep, es); if(hasSubProcess()) { diff --git a/FWCore/Framework/src/SubProcess.cc b/FWCore/Framework/src/SubProcess.cc index 8c287c73cb3ac..a93f79f2ec407 100644 --- a/FWCore/Framework/src/SubProcess.cc +++ b/FWCore/Framework/src/SubProcess.cc @@ -203,7 +203,7 @@ namespace edm { principal.reader()); ep.setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr()); propagateProducts(InEvent, principal, ep); - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; schedule_->processOneOccurrence(ep, esInfo_->es_); if(subProcess_.get()) subProcess_->doEvent(ep, esInfo_->ts_); ep.clearEventPrincipal(); @@ -234,7 +234,7 @@ namespace edm { RunPrincipal& rp = *principalCache_.runPrincipalPtr(); propagateProducts(InRun, principal, rp); - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; schedule_->processOneOccurrence(rp, esInfo_->es_); if(subProcess_.get()) subProcess_->doBeginRun(rp, esInfo_->ts_); } @@ -253,7 +253,7 @@ namespace edm { SubProcess::endRun(RunPrincipal const& principal) { RunPrincipal& rp = *principalCache_.runPrincipalPtr(); propagateProducts(InRun, principal, rp); - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; schedule_->processOneOccurrence(rp, esInfo_->es_, cleaningUpAfterException_); if(subProcess_.get()) subProcess_->doEndRun(rp, esInfo_->ts_, cleaningUpAfterException_); } @@ -294,7 +294,7 @@ namespace edm { principalCache_.insert(lbpp); LuminosityBlockPrincipal& lbp = *principalCache_.lumiPrincipalPtr(); propagateProducts(InLumi, principal, lbp); - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; schedule_->processOneOccurrence(lbp, esInfo_->es_); if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, esInfo_->ts_); } @@ -313,7 +313,7 @@ namespace edm { SubProcess::endLuminosityBlock(LuminosityBlockPrincipal const& principal) { LuminosityBlockPrincipal& lbp = *principalCache_.lumiPrincipalPtr(); propagateProducts(InLumi, principal, lbp); - typedef OccurrenceTraits Traits; + typedef OccurrenceTraits Traits; schedule_->processOneOccurrence(lbp, esInfo_->es_, cleaningUpAfterException_); if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, esInfo_->ts_, cleaningUpAfterException_); } diff --git a/FWCore/Framework/src/Worker.h b/FWCore/Framework/src/Worker.h index 4c0f68660d9f5..55d624b007b6c 100644 --- a/FWCore/Framework/src/Worker.h +++ b/FWCore/Framework/src/Worker.h @@ -26,6 +26,7 @@ the worker is reset(). #include "FWCore/Framework/src/WorkerParams.h" #include "FWCore/Framework/interface/Actions.h" #include "FWCore/Framework/interface/CurrentProcessingContext.h" +#include "FWCore/Framework/interface/OccurrenceTraits.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Utilities/interface/Exception.h" @@ -46,6 +47,9 @@ namespace edm { class ProductHolderIndexHelper; class StreamID; + namespace workerhelper { + template< typename O> class CallImpl; + } class Worker { public: enum State { Ready, Pass, Fail, Exception }; @@ -112,19 +116,26 @@ namespace edm { int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon protected: + template friend class workerhelper::CallImpl; virtual std::string workerType() const = 0; - virtual bool implDoBegin(EventPrincipal&, EventSetup const& c, - CurrentProcessingContext const* cpc) = 0; - virtual bool implDoEnd(EventPrincipal&, EventSetup const& c, + virtual bool implDo(EventPrincipal&, EventSetup const& c, CurrentProcessingContext const* cpc) = 0; virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) = 0; + virtual bool implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c, + CurrentProcessingContext const* cpc) = 0; + virtual bool implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c, + CurrentProcessingContext const* cpc) = 0; virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c, - CurrentProcessingContext const* cpc) = 0; + CurrentProcessingContext const* cpc) = 0; virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c, - CurrentProcessingContext const* cpc) = 0; + CurrentProcessingContext const* cpc) = 0; + virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, + CurrentProcessingContext const* cpc) = 0; + virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, + CurrentProcessingContext const* cpc) = 0; virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c, - CurrentProcessingContext const* cpc) = 0; + CurrentProcessingContext const* cpc) = 0; virtual void implBeginJob() = 0; virtual void implEndJob() = 0; @@ -214,6 +225,84 @@ namespace edm { } } + namespace workerhelper { + template<> + class CallImpl> { + public: + static bool call(Worker* iWorker, StreamID, EventPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDo(ep,es,cpc); + } + }; + + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID, RunPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoBegin(ep,es,cpc); + } + }; + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID id, RunPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoStreamBegin(id,ep,es,cpc); + } + }; + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID, RunPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoEnd(ep,es,cpc); + } + }; + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID id, RunPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoStreamEnd(id,ep,es,cpc); + } + }; + + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID, LuminosityBlockPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoBegin(ep,es,cpc); + } + }; + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID id, LuminosityBlockPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoStreamBegin(id,ep,es,cpc); + } + }; + + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID, LuminosityBlockPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoEnd(ep,es,cpc); + } + }; + template<> + class CallImpl>{ + public: + static bool call(Worker* iWorker,StreamID id, LuminosityBlockPrincipal& ep, EventSetup const& es, + CurrentProcessingContext const* cpc) { + return iWorker->implDoStreamEnd(id,ep,es,cpc); + } + }; + } + template bool Worker::doWork(typename T::MyPrincipal& ep, EventSetup const& es, @@ -244,20 +333,16 @@ namespace edm { try { try { - ModuleSignalSentry cpp(actReg_.get(), md_); - if (T::begin_) { - rc = implDoBegin(ep, es, cpc); - } else { - rc = implDoEnd(ep, es, cpc); - } + ModuleSignalSentry cpp(actReg_.get(), md_); + rc = workerhelper::CallImpl::call(this,streamID,ep,es,cpc); - if (rc) { - state_ = Pass; - if (T::isEvent_) ++timesPassed_; - } else { - state_ = Fail; - if (T::isEvent_) ++timesFailed_; - } + if (rc) { + state_ = Pass; + if (T::isEvent_) ++timesPassed_; + } else { + state_ = Fail; + if (T::isEvent_) ++timesFailed_; + } } catch (cms::Exception& e) { throw; } catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); } diff --git a/FWCore/Framework/src/WorkerT.cc b/FWCore/Framework/src/WorkerT.cc index f01b88eac92ed..d1391f6fad5c3 100644 --- a/FWCore/Framework/src/WorkerT.cc +++ b/FWCore/Framework/src/WorkerT.cc @@ -31,7 +31,7 @@ namespace edm{ template inline bool - WorkerT::implDoBegin(EventPrincipal& ep, EventSetup const& c, CurrentProcessingContext const* cpc) { + WorkerT::implDo(EventPrincipal& ep, EventSetup const& c, CurrentProcessingContext const* cpc) { UnscheduledHandlerSentry s(getUnscheduledHandler(ep), cpc); boost::shared_ptr sentry(this,[&ep](Worker* obj) {obj->postDoEvent(ep);}); return module_->doEvent(ep, c, cpc); @@ -40,15 +40,24 @@ namespace edm{ template inline bool - WorkerT::implDoEnd(EventPrincipal&, EventSetup const&, CurrentProcessingContext const*) { - return false; + WorkerT::implDoBegin(RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) { + module_->doBeginRun(rp, c, cpc); + return true; } template inline bool - WorkerT::implDoBegin(RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) { - module_->doBeginRun(rp, c, cpc); + WorkerT::implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) { + //module_->doStreamBeginRun(id, rp, c, cpc); + return true; + } + + template + inline + bool + WorkerT::implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) { + //module_->doStreamEndRun(id, rp, c, cpc); return true; } @@ -68,6 +77,22 @@ namespace edm{ return true; } + template + inline + bool + WorkerT::implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, CurrentProcessingContext const* cpc) { + //module_->doStreamBeginLuminosityBlock(id, lbp, c, cpc); + return true; + } + + template + inline + bool + WorkerT::implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, CurrentProcessingContext const* cpc) { + //module_->doStreamEndLuminosityBlock(id, lbp, c, cpc); + return true; + } + template inline bool diff --git a/FWCore/Framework/src/WorkerT.h b/FWCore/Framework/src/WorkerT.h index c3e8e3fe44117..6bf263453072f 100644 --- a/FWCore/Framework/src/WorkerT.h +++ b/FWCore/Framework/src/WorkerT.h @@ -55,18 +55,24 @@ namespace edm { T const& module() const {return *module_;} private: - virtual bool implDoBegin(EventPrincipal& ep, EventSetup const& c, - CurrentProcessingContext const* cpc) override; - virtual bool implDoEnd(EventPrincipal& ep, EventSetup const& c, - CurrentProcessingContext const* cpc) override; + virtual bool implDo(EventPrincipal& ep, EventSetup const& c, + CurrentProcessingContext const* cpc) override; virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c, - CurrentProcessingContext const* cpc) override; + CurrentProcessingContext const* cpc) override; + virtual bool implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c, + CurrentProcessingContext const* cpc) override; + virtual bool implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c, + CurrentProcessingContext const* cpc) override; virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c, CurrentProcessingContext const* cpc) override; virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c, CurrentProcessingContext const* cpc) override; + virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, + CurrentProcessingContext const* cpc) override; + virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c, + CurrentProcessingContext const* cpc) override; virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c, - CurrentProcessingContext const* cpc) override; + CurrentProcessingContext const* cpc) override; virtual void implBeginJob() override; virtual void implEndJob() override; virtual void implRespondToOpenInputFile(FileBlock const& fb) override; From 8ac72af4e4b04194153bcf390f98f982ca854bb4 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 8 Jul 2013 14:54:12 -0500 Subject: [PATCH 4/4] Merged stream transition changes into CMSSW_7_0_X This finishes the merging of the stream transition code into the CMSSW_7_0_X official branch. Although this code compiles and runs properly for now, when Stream transitions are fully propagated to modules this code will have to be updated to also propagate them. --- Mixing/Base/src/SecondaryEventProvider.cc | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/Mixing/Base/src/SecondaryEventProvider.cc b/Mixing/Base/src/SecondaryEventProvider.cc index 9384a3af13224..f4ea3ebc94b82 100644 --- a/Mixing/Base/src/SecondaryEventProvider.cc +++ b/Mixing/Base/src/SecondaryEventProvider.cc @@ -1,6 +1,7 @@ #include "Mixing/Base/src/SecondaryEventProvider.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/Utilities/interface/StreamID.h" namespace edm { SecondaryEventProvider::SecondaryEventProvider(std::vector& psets, @@ -25,23 +26,25 @@ namespace edm { } } // SecondaryEventProvider::SecondaryEventProvider + //NOTE: When the Stream interfaces are propagated to the modules, this code must be updated + // to also send the stream based transitions void SecondaryEventProvider::beginRun(RunPrincipal& run, const EventSetup& setup) { - workerManager_.processOneOccurrence >(run, setup); + workerManager_.processOneOccurrence >(run, setup, StreamID::invalidStreamID()); } void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi, const EventSetup& setup) { - workerManager_.processOneOccurrence >(lumi, setup); + workerManager_.processOneOccurrence >(lumi, setup, StreamID::invalidStreamID()); } void SecondaryEventProvider::endRun(RunPrincipal& run, const EventSetup& setup) { - workerManager_.processOneOccurrence >(run, setup); + workerManager_.processOneOccurrence >(run, setup, StreamID::invalidStreamID()); } void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi, const EventSetup& setup) { - workerManager_.processOneOccurrence >(lumi, setup); + workerManager_.processOneOccurrence >(lumi, setup, StreamID::invalidStreamID()); } void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep, const EventSetup& setup) { - workerManager_.processOneOccurrence >(ep, setup); + workerManager_.processOneOccurrence >(ep, setup, ep.streamID()); } }