Skip to content

Commit

Permalink
Merge pull request #47073 from Dr15Jones/testsStreamer
Browse files Browse the repository at this point in the history
Improve behavior of streamer system
  • Loading branch information
cmsbuild authored Jan 11, 2025
2 parents 19e8412 + bd2576d commit 3d5b63a
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 26 deletions.
2 changes: 2 additions & 0 deletions IOPool/Streamer/interface/StreamerOutputModuleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace edm {
private:
edm::EDGetTokenT<edm::TriggerResults> trToken_;
edm::EDGetTokenT<SendJobHeader::ParameterSetMap> psetToken_;
edm::ProcessHistoryID lastHistory_;
bool lastCallWasBeginRun_ = false;
bool initWritten_ = false;

}; //end-of-class-def
} // namespace streamer
Expand Down
15 changes: 2 additions & 13 deletions IOPool/Streamer/src/StreamerInputFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,8 @@ namespace edm::streamer {
eventSize = head.size();
if (code != Header::EVENT) {
if (code == Header::INIT) {
edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped";
if (eventSize < sizeof(EventHeader)) {
//very unlikely case that EventHeader is larger than total INIT size inserted in the middle of the file
hdrSkipped = nGot - eventSize;
memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped);
continue;
}
if (headerBuf_.size() < eventSize)
headerBuf_.resize(eventSize);
memcpy(&headerBuf_[0], &eventBuf_[0], nGot);
readBytes(&headerBuf_[nGot], eventSize, true, nGot);
//do not parse this header and proceed to the next event
continue;
throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
<< "Found another INIT header in the file.";
}
throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
<< "Failed reading streamer file, unknown code in event header\n"
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Streamer/src/StreamerInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace edm::streamer {
pReg.updateFromInput(descs);
std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
if (!mergeInfo.empty()) {
throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
throw cms::Exception("MismatchedInput", "StreamerInputSource::mergeIntoRegistry") << mergeInfo;
}
} else {
declareStreamers(descs);
Expand Down
37 changes: 25 additions & 12 deletions IOPool/Streamer/src/StreamerOutputModuleBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,31 @@ namespace edm::streamer {
void StreamerOutputModuleBase::beginRun(RunForOutput const& iRun) {
start();

auto psetMapHandle = iRun.getHandle(psetToken_);

std::unique_ptr<InitMsgBuilder> init_message =
serializeRegistry(OutputModule::processName(),
description().moduleLabel(),
moduleDescription().mainParameterSetID(),
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);

doOutputHeader(*init_message);
lastCallWasBeginRun_ = true;

clearHeaderBuffer();
if (not initWritten_) {
auto psetMapHandle = iRun.getHandle(psetToken_);

std::unique_ptr<InitMsgBuilder> init_message =
serializeRegistry(OutputModule::processName(),
description().moduleLabel(),
moduleDescription().mainParameterSetID(),
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);

doOutputHeader(*init_message);
lastCallWasBeginRun_ = true;
auto history = iRun.processHistory();
lastHistory_ = history.reduce().id();
initWritten_ = true;

clearHeaderBuffer();
} else {
auto history = iRun.processHistory();
if (lastHistory_ != history.reduce().id()) {
throw edm::Exception(errors::FileWriteError) << "Streamer output can not handle writing a new Run if the "
"ProcessHistory changed since the last Run written.";
}
//need to write meta data anyway
lastCallWasBeginRun_ = true;
}
}

void StreamerOutputModuleBase::endRun(RunForOutput const&) { stop(); }
Expand Down
4 changes: 4 additions & 0 deletions IOPool/Streamer/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

<test name="TestIOPoolStreamerRefProductIDMetadataConsistency" command="run_TestRefProductIDMetadataConsistencyStreamer.sh"/>

<test name="TestIOPoolStreamerRefMerge" command="run_RefMerge.sh"/>

<test name="TestIOPoolStreamerFailures" command="run_failures.sh"/>

<library file="StreamThingProducer.cc" name="StreamThingProducer">
<flags EDM_PLUGIN="1"/>
<use name="DataFormats/TestObjects"/>
Expand Down
26 changes: 26 additions & 0 deletions IOPool/Streamer/test/ref_merge_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import FWCore.ParameterSet.Config as cms
import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ConditionalTasks.')

parser.add_argument("--inFile1", help="first file to read")
parser.add_argument("--inFile2", help="second file to read")
parser.add_argument("--outFile", help="name of output file", default="ref_merge.root")

args = parser.parse_args()

process = cms.Process("MERGE")

from IOPool.Streamer.modules import NewEventStreamFileReader
process.source = NewEventStreamFileReader(fileNames = [f"file:{args.inFile1}",
f"file:{args.inFile2}"]
)

from IOPool.Streamer.modules import EventStreamFileWriter
process.out = EventStreamFileWriter(fileName = args.outFile)

from FWCore.Integration.modules import OtherThingAnalyzer
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))

process.o = cms.EndPath(process.out+process.tester)
59 changes: 59 additions & 0 deletions IOPool/Streamer/test/ref_merge_prod_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import FWCore.ParameterSet.Config as cms

import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ConditionalTasks.')

parser.add_argument("--extraProducers", help="Add extra producers to configuration", action="store_true")
parser.add_argument("--fileName", help="name of output file")
parser.add_argument("--firstLumi", help="LuminosityBlock number for first lumi", type = int, default=1)
parser.add_argument("--firstRun", help="LuminosityBlock number for first run", type = int, default=1)
parser.add_argument("--keepAllProducts", help="Keep all products made in the job", action="store_true")
parser.add_argument("--dropThings", help="drop the Things collections so the refs will not function", action="store_true")

args = parser.parse_args()


process = cms.Process("PROD")

nEvents = 10
from FWCore.Modules.modules import EmptySource
process.source = EmptySource(firstRun = args.firstRun,
firstLuminosityBlock = args.firstLumi,
firstEvent = nEvents*(args.firstLumi-1)+1
)

process.maxEvents.input = nEvents

if args.extraProducers:
from FWCore.Framework.modules import IntProducer
process.a = IntProducer(ivalue = 1)

process.b = IntProducer(ivalue = 2)

from FWCore.Integration.modules import ThingProducer, OtherThingProducer, OtherThingAnalyzer
process.c = ThingProducer()

process.d = OtherThingProducer(thingTag="c")

outputs = []
if not args.keepAllProducts:
outputs = ["drop *",
"keep edmtestOtherThings_*_*_*"]
if not args.dropThings:
outputs.append("keep edmtestThings_*_*_*")


from IOPool.Streamer.modules import EventStreamFileWriter
process.o = EventStreamFileWriter(outputCommands = outputs,
fileName = args.fileName
)
if args.extraProducers:
process.p = cms.Path(process.a+process.b+process.c*process.d)
else:
process.p = cms.Path(process.c*process.d)

process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))

process.out = cms.EndPath(process.o+process.tester)
19 changes: 19 additions & 0 deletions IOPool/Streamer/test/ref_merge_test_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import FWCore.ParameterSet.Config as cms
import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test Refs after merge.')

parser.add_argument("--fileName", help="file to read")

args = parser.parse_args()

process = cms.Process("TEST")

from IOPool.Streamer.modules import NewEventStreamFileReader
process.source = NewEventStreamFileReader(fileNames = [f"file:{args.fileName}"])

from FWCore.Integration.modules import OtherThingAnalyzer
process.tester = OtherThingAnalyzer(other = ("d","testUserTag"))

process.e = cms.EndPath(process.tester)
45 changes: 45 additions & 0 deletions IOPool/Streamer/test/run_RefMerge.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

test=ref_merge_

function die { echo Failure $1: status $2 ; exit $2 ; }

LOCAL_TEST_DIR=${SCRAM_TEST_PATH}
#------------- same configs, same run ------------

echo ${test}prod_a ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --fileName 'ref_merge_proda.root' || die "cmsRun ${test}prod_cfg.py" $?

echo ${test}prod_b ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prodb.root'|| die "cmsRun ${test}prod_cfg.py" $?

echo ${test}MERGE_same------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prodb.root' --outFile 'ref_merge_same.root' || die "cmsRun ${test}cfg.py same" $?

echo ${test}test_same------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_same.root' || die "cmsRun ${test}test_cfg.py same" $?

#------------- same configs different stored products, same run ------------
# works if subsequent files have a strict subset of stored products of the first file

echo ${test}prod_b ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod_all.root' --keepAllProducts || die "cmsRun ${test}prod_cfg.py" $?

echo ${test}MERGE_diff_prods1------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile2 'ref_merge_proda.root' --inFile1 'ref_merge_prod_all.root' --outFile 'ref_merge_diff_prods.root' || die "cmsRun ${test}cfg.py diff prods" $?

echo ${test}test_diff_prods1------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_diff_prods.root' || die "cmsRun ${test}test_cfg.py diff prods" $?

#------------- same configs, different run ------------

echo ${test}prod_run10 ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstRun 10 --fileName 'ref_merge_prod_run10.root'|| die "cmsRun ${test}prod_cfg.py run10" $?

echo ${test}MERGE_diff_runs------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prod_run10.root' --outFile 'ref_merge_diffRuns.root' || die "cmsRun ${test}cfg.py diff runs" $?

echo ${test}test_diff_runs------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_diffRuns.root' || die "cmsRun ${test}test_cfg.py diff runs" $?

exit 0
49 changes: 49 additions & 0 deletions IOPool/Streamer/test/run_failures.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash

test=ref_merge_

function die { echo Failure $1: status $2 ; exit $2 ; }

LOCAL_TEST_DIR=${SCRAM_TEST_PATH}
#------------- same configs, same run ------------

echo ${test}prod_a ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --fileName 'ref_merge_proda.root' || die "cmsRun ${test}prod_cfg.py" $?

echo ${test}prod_b ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prodb.root'|| die "cmsRun ${test}prod_cfg.py" $?

#------------- same configs, same run using cat ------------
cat ref_merge_proda.root ref_merge_prodb.root > ref_merge_cat.root

echo ${test}test_cat------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}test_cfg.py --fileName 'ref_merge_cat.root' && die "cmsRun ${test}test_cfg.py same" 1

#------------- same configs different stored products, same run ------------

echo ${test}prod_ass ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod_all.root' --keepAllProducts || die "cmsRun ${test}prod_cfg.py" $?

echo ${test}MERGE_diff_prods2------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_proda.root' --inFile2 'ref_merge_prod_all.root' --outFile 'ref_merge_diff_prods2.root' && die "cmsRun ${test}cfg.py diff prods 2" 1

#------------- different configs ------------

echo ${test}prod1 ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --extraProducers --fileName 'ref_merge_prod1.root' || die "cmsRun ${test}prod_cfg.py --extraProducers" $?

echo ${test}prod2 ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --firstLumi 10 --fileName 'ref_merge_prod2.root'|| die "cmsRun ${test}prod_cfg.py" $?

echo ${test}MERGE_diff_configs------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile1 'ref_merge_prod1.root' --inFile2 'ref_merge_prod2.root' --outFile 'ref_merge.root' && die "cmsRun ${test}cfg.py diff configs" 1

#------------- different configs and different products ------------

echo ${test}keepAllProd ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}prod_cfg.py --extraProducers --keepAllProducts --fileName 'ref_merge_prod_all.root' || die "cmsRun ${test}prod_cfg.py --keepAllProducts" $?

echo ${test}MERGE_keepAll1st ------------------------------------------------------------
cmsRun ${LOCAL_TEST_DIR}/${test}cfg.py --inFile2 'ref_merge_prod_all.root' --inFile1 'ref_merge_prod2.root' --outFile 'ref_merge_all1st.root' && die "cmsRun ${test}cfg.py" 1

exit 0

0 comments on commit 3d5b63a

Please sign in to comment.