Skip to content

Commit

Permalink
[multiproc] Correct entries ranges w/ nWorkers>nEntries
Browse files Browse the repository at this point in the history
When processing trees with less entries than workers with TTreeProcessorMP
some entries were processed multiple times because of a mistake in the
algorithm calculating the event ranges.

Thanks to @hageboeck for the help with the output management of the test.

Fixes root-project#15425
  • Loading branch information
dpiparo committed Jul 31, 2024
1 parent eedcd27 commit 0e7387f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
14 changes: 11 additions & 3 deletions tree/treeplayer/src/TMPWorkerTree.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,16 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start,
UInt_t nBunch = nEntries / fNWorkers;
UInt_t rangeN = nProcessed % fNWorkers;
start = rangeN * nBunch;
if (rangeN < (fNWorkers - 1)) {
if (start >= nEntries) {
start = finish = nEntries;
}
else if (rangeN < (fNWorkers - 1)) {
finish = (rangeN+1)*nBunch;
} else {
finish = nEntries;
}


//process tree
tree = fTree;
CloseFile(); // May not be needed
Expand Down Expand Up @@ -389,7 +393,9 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start,
if(nEntries % fNWorkers) nBunch++;
UInt_t rangeN = nProcessed % fNWorkers;
start = rangeN * nBunch;
if(rangeN < (fNWorkers-1))
if (start >= nEntries)
start = finish = nEntries;
else if(rangeN < (fNWorkers-1))
finish = (rangeN+1)*nBunch;
else
finish = nEntries;
Expand All @@ -414,7 +420,9 @@ Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start,
if (nEntries % fNWorkers) nBunch++;
UInt_t rangeN = nProcessed % fNWorkers;
start = rangeN * nBunch;
if (rangeN < (fNWorkers - 1))
if (ULong64_t(start) >= nEntries) {
start = finish = nEntries;
} else if (rangeN < (fNWorkers - 1))
finish = (rangeN + 1) * nBunch;
else
finish = nEntries;
Expand Down
4 changes: 2 additions & 2 deletions tree/treeplayer/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ if(NOT MSVC OR win_broken_tests)
endif()

if(imt)
ROOT_ADD_GTEST(treeprocessormt treeprocmt/treeprocessormt.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(treeprocessors treeprocs/treeprocessors.cxx LIBRARIES TreePlayer)
if(xrootd)
ROOT_ADD_GTEST(treeprocessormt_remotefiles treeprocmt/treeprocessormt_remotefiles.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(treeprocessormt_remotefiles treeprocs/treeprocessormt_remotefiles.cxx LIBRARIES TreePlayer)
endif()
endif()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <random>
#include <string>
#include <thread>
#include <utility>

#include <TFile.h>
#include <TParameter.h>
#include <TTree.h>
#include <TSystem.h>
#include <TTreeReader.h>
#include <TTreeReaderValue.h>
#include <ROOT/TTreeProcessorMP.hxx>
#include <ROOT/TTreeProcessorMT.hxx>

#include "gtest/gtest.h"
#include "ROOT/TestSupport.hxx"

void WriteFiles(const std::vector<std::string> &treenames, const std::vector<std::string> &filenames)
#include <algorithm>
#include <atomic>
#include <chrono>
#include <random>
#include <string>
#include <thread>
#include <utility>

void WriteFiles(const std::vector<std::string> &treenames, const std::vector<std::string> &filenames, int nEvts = 10)
{
int v = 0;
const auto nFiles = filenames.size();
Expand All @@ -27,7 +30,7 @@ void WriteFiles(const std::vector<std::string> &treenames, const std::vector<std
TFile file(fname.c_str(), "recreate");
TTree t(treename.c_str(), treename.c_str());
t.Branch("v", &v);
for (auto e = 0; e < 10; ++e) {
for (auto e = 0; e < nEvts; ++e) {
++v;
t.Fill();
}
Expand Down Expand Up @@ -56,6 +59,42 @@ void DeleteFiles(const std::vector<std::string> &filenames)
gSystem->Unlink(f.c_str());
}

class FilesRAII {
std::vector<std::string> fFileNames;

public:
FilesRAII(const std::vector<std::string> &treenames, const std::vector<std::string> &filenames, int nEvts = 10)
: fFileNames(filenames)
{
WriteFiles(treenames, filenames, nEvts);
}
~FilesRAII() { DeleteFiles(fFileNames); }
};

// See issue #15425
TEST(TreeProcessorMP, moreWorkersThanEvents)
{
auto func = [](TTreeReader &r){
int n=0;
while(r.Next()) n++;
auto par = new TParameter<int>("n", n);
return par;
};

std::vector<std::string> files = {"f_moreWorkersThanEvents.root"};
std::vector<std::string> trees = {"t"};

FilesRAII fr(trees, files, 2);

ROOT::TestSupport::CheckDiagsRAII checkDiag;
checkDiag.requiredDiag(kError, "TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: [S2]: could not set TTreeReader to range 2 1", false);

ROOT::TTreeProcessorMP proc(3);
auto res = proc.Process(files, func);

EXPECT_EQ(2, res->GetVal()) << "The counter incremented in the worker processes has the wrong value.";
}

TEST(TreeProcessorMT, EmptyTChain)
{
TChain c("mytree");
Expand Down

0 comments on commit 0e7387f

Please sign in to comment.