Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement "ramdisk" online client #12207

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import FWCore.ParameterSet.Config as cms
import sys

subsystem = "Ramdisk"
process = cms.Process(subsystem)

import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing

process.load('DQM.Integration.config.inputsource_cfi')
process.load('DQMServices.Components.DQMEnvironment_cfi')
process.load('DQM.Integration.config.environment_cfi')

process.dqmEnv.subSystemFolder = subsystem
process.dqmSaver.tag = subsystem

process.analyzer = cms.EDAnalyzer("RamdiskMonitor",
runNumber = process.source.runNumber,
runInputDir = process.source.runInputDir,
streamLabels = cms.untracked.vstring(
"streamDQM",
"streamDQMHistograms",
"streamDQMCalibration",
)
)

process.p = cms.Path(process.analyzer)
process.dqmsave_step = cms.Path(process.dqmEnv * process.dqmSaver)

process.schedule = cms.Schedule(
process.p,
process.dqmsave_step
)
2 changes: 2 additions & 0 deletions DQM/Integration/python/config/inputsource_cfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@
deleteDatFiles = cms.untracked.bool(False),
endOfRunKills = cms.untracked.bool(True),
)

print "Source:", source
56 changes: 30 additions & 26 deletions DQMServices/StreamerIO/plugins/DQMFileIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
namespace dqmservices {

DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(
const std::string& filename, int lumiNumber, unsigned int datafn_position) {
const std::string& filename, int lumiNumber, int datafn_position) {
boost::property_tree::ptree pt;
read_json(filename, pt);

LumiEntry lumi;
lumi.filename = filename;

// We rely on n_events to be the first item on the array...
lumi.n_events = std::next(pt.get_child("data").begin(), 1)
->second.get_value<std::size_t>();
lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)
->second.get_value<std::size_t>();

lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)
->second.get_value<std::size_t>();

lumi.file_ls = lumiNumber;
lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)
->second.get_value<std::string>();

if (datafn_position >= 0) {
lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)
->second.get_value<std::string>();
}

return lumi;
}
Expand All @@ -55,9 +60,7 @@ DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(
return eor;
}

DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset)
: state_(EOR) {

DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
Expand Down Expand Up @@ -93,7 +96,6 @@ void DQMFileIterator::reset() {
doc.put("fi_state", std::to_string(state_));
mon_->outputUpdate(doc);
}

}

DQMFileIterator::State DQMFileIterator::state() { return state_; }
Expand Down Expand Up @@ -149,11 +151,11 @@ void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
}

void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
if (! mon_.isAvailable())
return;
if (!mon_.isAvailable()) return;

ptree doc;
doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls), lumi.state);
doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls),
lumi.state);
mon_->outputUpdate(doc);
}

Expand All @@ -171,7 +173,7 @@ void DQMFileIterator::collect(bool ignoreTimers) {

auto now = std::chrono::high_resolution_clock::now();
auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - runPathLastCollect_).count();
now - runPathLastCollect_).count();

// don't refresh if it's too soon
if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
Expand All @@ -181,11 +183,12 @@ void DQMFileIterator::collect(bool ignoreTimers) {
// check if directory changed
std::time_t mtime_now = boost::filesystem::last_write_time(runPath_);

if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
//logFileAction("Directory hasn't changed.");
if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) &&
(mtime_now == runPathMTime_)) {
// logFileAction("Directory hasn't changed.");
return;
} else {
//logFileAction("Directory changed, updating.");
// logFileAction("Directory changed, updating.");
}

runPathMTime_ = mtime_now;
Expand Down Expand Up @@ -293,7 +296,6 @@ void DQMFileIterator::update_state() {
if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {

auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();

Expand Down Expand Up @@ -340,7 +342,8 @@ void DQMFileIterator::logFileAction(const std::string& msg,
edm::FlushMessageLog();
}

void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
void DQMFileIterator::logLumiState(const LumiEntry& lumi,
const std::string& msg) {
if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
lumiSeen_[lumi.file_ls].state = msg;

Expand All @@ -351,29 +354,30 @@ void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg
}

void DQMFileIterator::delay() {
if (mon_.isAvailable())
mon_->keepAlive();
if (mon_.isAvailable()) mon_->keepAlive();

usleep(delayMillis_ * 1000);
}

void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {

desc.addUntracked<unsigned int>("runNumber")
->setComment("Run number passed via configuration file.");

desc.addUntracked<unsigned int>("datafnPosition", 3)
->setComment("Data filename position in the positional arguments array 'data' in json file.");
->setComment(
"Data filename position in the positional arguments array 'data' in "
"json file.");

desc.addUntracked<std::string>("streamLabel")
->setComment("Stream label used in json discovery.");

desc.addUntracked<uint32_t>("delayMillis")
->setComment("Number of milliseconds to wait between file checks.");

desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
"Number of milliseconds to wait before switching to the next lumi "
"section if the current is missing, -1 to disable.");
desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
->setComment(
"Number of milliseconds to wait before switching to the next lumi "
"section if the current is missing, -1 to disable.");

desc.addUntracked<std::string>("runInputDir")
->setComment("Directory where the DQM files will appear.");
Expand Down
7 changes: 4 additions & 3 deletions DQMServices/StreamerIO/plugins/DQMFileIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ class DQMFileIterator {
std::string filename;

unsigned int file_ls;
std::size_t n_events;
std::size_t n_events_processed;
std::size_t n_events_accepted;
std::string datafn;

static LumiEntry load_json(const std::string& filename, int lumiNumber,
unsigned int datafn_position);
int datafn_position);

std::string state;
};
Expand Down Expand Up @@ -112,7 +113,7 @@ class DQMFileIterator {
std::chrono::high_resolution_clock::time_point lastLumiLoad_;

void collect(bool ignoreTimers);
void monUpdateLumi(const LumiEntry& lumi);
void monUpdateLumi(const LumiEntry& lumi);

/* this is for monitoring */
edm::Service<DQMMonitoringService> mon_;
Expand Down
26 changes: 10 additions & 16 deletions DQMServices/StreamerIO/plugins/DQMMonitoringService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <boost/format.hpp>
#include <boost/algorithm/string/predicate.hpp>


#include <ctime>

/*
Expand All @@ -14,7 +13,8 @@

namespace dqmservices {

DQMMonitoringService::DQMMonitoringService(const edm::ParameterSet &pset, edm::ActivityRegistry& ar) {
DQMMonitoringService::DQMMonitoringService(const edm::ParameterSet& pset,
edm::ActivityRegistry& ar) {
const char* x = getenv("DQM2_SOCKET");
if (x) {
std::cerr << "Monitoring pipe: " << x << std::endl;
Expand All @@ -37,13 +37,12 @@ DQMMonitoringService::DQMMonitoringService(const edm::ParameterSet &pset, edm::A
ar.watchPreSourceEvent(this, &DQMMonitoringService::evEvent);
}

DQMMonitoringService::~DQMMonitoringService() {
}
DQMMonitoringService::~DQMMonitoringService() {}

void DQMMonitoringService::outputLumiUpdate() {
using std::chrono::duration_cast;
using std::chrono::milliseconds;

auto now = std::chrono::high_resolution_clock::now();

ptree doc;
Expand Down Expand Up @@ -79,7 +78,6 @@ void DQMMonitoringService::outputLumiUpdate() {
}

outputUpdate(doc);

}

void DQMMonitoringService::evLumi(GlobalContext const& iContext) {
Expand All @@ -92,7 +90,7 @@ void DQMMonitoringService::evLumi(GlobalContext const& iContext) {
lumi_ = iContext.luminosityBlockID().luminosityBlock();

outputLumiUpdate();

last_lumi_time_ = std::chrono::high_resolution_clock::now();
last_lumi_nevents_ = nevents_;
last_lumi_ = lumi_;
Expand All @@ -107,8 +105,7 @@ void DQMMonitoringService::outputUpdate(ptree& doc) {
using std::chrono::duration_cast;
using std::chrono::milliseconds;

if (!mstream_)
return;
if (!mstream_) return;

try {
last_update_time_ = std::chrono::high_resolution_clock::now();
Expand All @@ -122,8 +119,7 @@ void DQMMonitoringService::outputUpdate(ptree& doc) {
}

void DQMMonitoringService::keepAlive() {
if (!mstream_)
return;
if (!mstream_) return;

mstream_ << "\n";
mstream_.flush();
Expand All @@ -135,21 +131,19 @@ void DQMMonitoringService::tryUpdate() {
using std::chrono::duration_cast;
using std::chrono::milliseconds;

if (!mstream_)
return;
if (!mstream_) return;

// sometimes we don't see any transition for a very long time
// but we still want updates
// luckily, keepAlive is called rather often by the input source
auto now = std::chrono::high_resolution_clock::now();
auto millis = duration_cast<milliseconds>(now - last_update_time_).count();
if (millis >= (25*1000)) {
if (millis >= (25 * 1000)) {
outputLumiUpdate();
}
}


} // end-of-namespace
} // end-of-namespace

#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"

Expand Down
48 changes: 25 additions & 23 deletions DQMServices/StreamerIO/plugins/DQMMonitoringService.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,37 +42,39 @@ using edm::StreamContext;
using edm::GlobalContext;

class DQMMonitoringService {
public:
DQMMonitoringService(const edm::ParameterSet &, edm::ActivityRegistry&);
~DQMMonitoringService();
public:
DQMMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
~DQMMonitoringService();

void connect();
void keepAlive();
void connect();
void keepAlive();

void outputLumiUpdate();
void outputUpdate(ptree& doc);
void outputLumiUpdate();
void outputUpdate(ptree& doc);

void evLumi(GlobalContext const&);
void evEvent(StreamID const&);

void tryUpdate();
void evLumi(GlobalContext const&);
void evEvent(StreamID const&);

private:
boost::asio::local::stream_protocol::iostream mstream_;
void tryUpdate();

// global number of events processed
long nevents_;
private:
boost::asio::local::stream_protocol::iostream mstream_;

// time point, number of events and the lumi number at the time we switched to it
unsigned long last_lumi_; // last lumi (we report stats for it, after we switch to the next one)
std::chrono::high_resolution_clock::time_point last_lumi_time_;
std::chrono::high_resolution_clock::time_point last_update_time_;
long last_lumi_nevents_;
// global number of events processed
long nevents_;

unsigned long run_; // current run
unsigned long lumi_; // current lumi
// time point, number of events and the lumi number at the time we switched to
// it
unsigned long last_lumi_; // last lumi (we report stats for it, after we
// switch to the next one)
std::chrono::high_resolution_clock::time_point last_lumi_time_;
std::chrono::high_resolution_clock::time_point last_update_time_;
long last_lumi_nevents_;

unsigned long run_; // current run
unsigned long lumi_; // current lumi
};

} // end-of-namespace
} // end-of-namespace

#endif
Loading