From 0818b674bed3d6f0a98a4b8cf4c4d76131b03130 Mon Sep 17 00:00:00 2001 From: Danilo Piparo Date: Wed, 31 Jul 2024 14:46:22 +0200 Subject: [PATCH] [multiproc] Correct entries ranges w/ nWorkers>nEntries When processing trees with less entries than workers with TTreeProcessorMP some entries were processed multiple times because of a mistake in the algorithm calculating the event ranges. Fixes #15425 --- tree/treeplayer/inc/TMPWorkerTree.h | 42 +++++---- tree/treeplayer/src/TMPWorkerTree.cxx | 16 +++- tree/treeplayer/test/CMakeLists.txt | 6 +- .../treeprocessormt_remotefiles.cxx | 0 .../treeprocessors.cxx} | 88 ++++++++++++++++--- 5 files changed, 117 insertions(+), 35 deletions(-) rename tree/treeplayer/test/{treeprocmt => treeprocs}/treeprocessormt_remotefiles.cxx (100%) rename tree/treeplayer/test/{treeprocmt/treeprocessormt.cxx => treeprocs/treeprocessors.cxx} (89%) diff --git a/tree/treeplayer/inc/TMPWorkerTree.h b/tree/treeplayer/inc/TMPWorkerTree.h index 0dbe7342e4b23..c722fd94faa25 100644 --- a/tree/treeplayer/inc/TMPWorkerTree.h +++ b/tree/treeplayer/inc/TMPWorkerTree.h @@ -199,33 +199,37 @@ void TMPWorkerTreeFunc::Process(UInt_t code, MPCodeBufPair &msg) return; } + // If we are not done processing entries in the tree, // create a TTreeReader that reads this range of entries - TTreeReader reader(fTree, enl); + if (start >= 0 && start < fTree->GetEntries()) { + TTreeReader reader(fTree, enl); + + TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish); + if (status != TTreeReader::kEntryValid) { + reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1); + MPSend(GetSocket(), MPCode::kProcError, reply.c_str()); + return; + } - TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish); - if(status != TTreeReader::kEntryValid) { - reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1); - MPSend(GetSocket(), MPCode::kProcError, reply.c_str()); - return; - } + // execute function + auto res = fProcFunc(reader); - //execute function - auto res = fProcFunc(reader); + // detach result from file if needed (currently needed for TH1, TTree, TEventList) + DetachRes(res); - //detach result from file if needed (currently needed for TH1, TTree, TEventList) - DetachRes(res); + if (fCanReduce) { + PoolUtils::ReduceObjects redfunc; + fReducedResult = static_cast(redfunc( + {res, fReducedResult})); // TODO try not to copy these into a vector, do everything by ref. std::vector? + } else { + fCanReduce = true; + fReducedResult = res; + } + } //update the number of processed entries fProcessedEntries += finish - start; - if(fCanReduce) { - PoolUtils::ReduceObjects redfunc; - fReducedResult = static_cast(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector? - } else { - fCanReduce = true; - fReducedResult = res; - } - if(fMaxNEntries == fProcessedEntries) //we are done forever MPSend(GetSocket(), MPCode::kProcResult, fReducedResult); diff --git a/tree/treeplayer/src/TMPWorkerTree.cxx b/tree/treeplayer/src/TMPWorkerTree.cxx index acb63d6438fe9..81c9d38163dd2 100644 --- a/tree/treeplayer/src/TMPWorkerTree.cxx +++ b/tree/treeplayer/src/TMPWorkerTree.cxx @@ -310,12 +310,16 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, UInt_t nBunch = nEntries / fNWorkers; UInt_t rangeN = nProcessed % fNWorkers; start = rangeN * nBunch; - if (rangeN < (fNWorkers - 1)) { + if (start >= nEntries) { + start = finish = nEntries; + } + else if (rangeN < (fNWorkers - 1)) { finish = (rangeN+1)*nBunch; } else { finish = nEntries; } + //process tree tree = fTree; CloseFile(); // May not be needed @@ -389,7 +393,9 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, if(nEntries % fNWorkers) nBunch++; UInt_t rangeN = nProcessed % fNWorkers; start = rangeN * nBunch; - if(rangeN < (fNWorkers-1)) + if (start >= nEntries) + start = finish = nEntries; + else if(rangeN < (fNWorkers-1)) finish = (rangeN+1)*nBunch; else finish = nEntries; @@ -409,12 +415,14 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, if (code == MPCode::kProcRange) { // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21 // and this worker must take the rangeN-th range - ULong64_t nEntries = (*enl)->GetN(); + Long64_t nEntries = (*enl)->GetN(); UInt_t nBunch = nEntries / fNWorkers; if (nEntries % fNWorkers) nBunch++; UInt_t rangeN = nProcessed % fNWorkers; start = rangeN * nBunch; - if (rangeN < (fNWorkers - 1)) + if (start >= nEntries) { + start = finish = nEntries; + } else if (rangeN < (fNWorkers - 1)) finish = (rangeN + 1) * nBunch; else finish = nEntries; diff --git a/tree/treeplayer/test/CMakeLists.txt b/tree/treeplayer/test/CMakeLists.txt index ce2b2106deb80..54d4ece717be5 100644 --- a/tree/treeplayer/test/CMakeLists.txt +++ b/tree/treeplayer/test/CMakeLists.txt @@ -11,9 +11,11 @@ if(NOT MSVC OR win_broken_tests) endif() if(imt) - ROOT_ADD_GTEST(treeprocessormt treeprocmt/treeprocessormt.cxx LIBRARIES TreePlayer) + if (NOT MSVC) + ROOT_ADD_GTEST(treeprocessors treeprocs/treeprocessors.cxx LIBRARIES TreePlayer) + endif() if(xrootd) - ROOT_ADD_GTEST(treeprocessormt_remotefiles treeprocmt/treeprocessormt_remotefiles.cxx LIBRARIES TreePlayer) + ROOT_ADD_GTEST(treeprocessormt_remotefiles treeprocs/treeprocessormt_remotefiles.cxx LIBRARIES TreePlayer) endif() endif() diff --git a/tree/treeplayer/test/treeprocmt/treeprocessormt_remotefiles.cxx b/tree/treeplayer/test/treeprocs/treeprocessormt_remotefiles.cxx similarity index 100% rename from tree/treeplayer/test/treeprocmt/treeprocessormt_remotefiles.cxx rename to tree/treeplayer/test/treeprocs/treeprocessormt_remotefiles.cxx diff --git a/tree/treeplayer/test/treeprocmt/treeprocessormt.cxx b/tree/treeplayer/test/treeprocs/treeprocessors.cxx similarity index 89% rename from tree/treeplayer/test/treeprocmt/treeprocessormt.cxx rename to tree/treeplayer/test/treeprocs/treeprocessors.cxx index 9337d86b2916b..aedbf1167fa11 100644 --- a/tree/treeplayer/test/treeprocmt/treeprocessormt.cxx +++ b/tree/treeplayer/test/treeprocs/treeprocessors.cxx @@ -1,21 +1,25 @@ -#include -#include -#include -#include -#include -#include -#include - #include +#include #include #include #include #include +#ifndef MSVC +#include +#endif #include #include "gtest/gtest.h" -void WriteFiles(const std::vector &treenames, const std::vector &filenames) +#include +#include +#include +#include +#include +#include +#include + +void WriteFiles(const std::vector &treenames, const std::vector &filenames, int nEvts = 10) { int v = 0; const auto nFiles = filenames.size(); @@ -27,7 +31,7 @@ void WriteFiles(const std::vector &treenames, const std::vector &filenames) gSystem->Unlink(f.c_str()); } +class FilesRAII { + std::vector fFileNames; + +public: + FilesRAII(const std::vector &treenames, const std::vector &filenames, int nEvts = 10) + : fFileNames(filenames) + { + WriteFiles(treenames, filenames, nEvts); + } + ~FilesRAII() { DeleteFiles(fFileNames); } +}; + +#ifndef MSVC + +class TestSelector : public TSelector { +public: + TParameter fParameter; + + virtual void SlaveBegin(TTree *) {} + virtual bool Process(Long64_t) + { + auto newVal = fParameter.GetVal() + 1; + fParameter.SetVal(newVal); + return true; + } + virtual void SlaveTerminate() { GetOutputList()->Add(fParameter.Clone()); } +}; + +// See issue #15425 +TEST(TreeProcessorMP, moreWorkersThanEvents) +{ + auto func = [](TTreeReader &r) { + int n = 0; + while (r.Next()) + n++; + auto par = new TParameter("n", n); + return par; + }; + + std::vector files = {"f_moreWorkersThanEvents.root"}; + std::vector trees = {"t"}; + + FilesRAII fr(trees, files, 2); + + // Test function + { + ROOT::TTreeProcessorMP proc(3); + auto res = proc.Process(files, func); + + EXPECT_EQ(2, res->GetVal()) << "The counter incremented in the worker processes has the wrong value."; + + delete res; + } + // Test selector + { + ROOT::TTreeProcessorMP pool(3); + TestSelector sel; + auto resl = pool.Process(files[0], sel); + auto tparami = (TParameter *)resl->At(0); + EXPECT_EQ(2, tparami->GetVal()) << "The counter incremented in the worker processes has the wrong value."; + } +} +#endif // MSVC + TEST(TreeProcessorMT, EmptyTChain) { TChain c("mytree");