Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DiskIo telemetry device to persist the counters #721

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def _start_node(self, node_configuration, node_count_on_host):
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.DiskIo(self.metrics_store, node_count_on_host),
telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, car, node_name),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my question below whether car is actually needed.

telemetry.NodeEnvironmentInfo(self.metrics_store),
telemetry.IndexSize(data_paths, self.metrics_store),
telemetry.MergeParts(self.metrics_store, node_configuration.log_path),
Expand Down
40 changes: 31 additions & 9 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,17 +776,21 @@ def _store_merge_times(self, merge_times):


class DiskIo(InternalTelemetryDevice):
human_name = "Disk IO"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we get rid of the console message (see my comment below), this will be unused and we can remove it as well.

"""
Gathers disk I/O stats.
"""
def __init__(self, metrics_store, node_count_on_host):
def __init__(self, metrics_store, node_count_on_host, log_root, car, node_name):
super().__init__()
self.metrics_store = metrics_store
self.node_count_on_host = node_count_on_host
self.node = None
self.process = None
self.disk_start = None
self.process_start = None
self.car = car
self.node_name = node_name
self.log_root = log_root

def attach_to_node(self, node):
self.node = node
Expand All @@ -795,35 +799,53 @@ def attach_to_node(self, node):
def on_benchmark_start(self):
if self.process is not None:
self.process_start = sysstats.process_io_counters(self.process)
read_bytes = 0
write_bytes = 0
io.ensure_dir(self.log_root)
log_file = "%s/%s-%s.io" % (self.log_root, self.car.safe_name, self.node.node_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for example produces something like:

.rally/benchmarks/races/2019-07-04-15-05-34/rally-node-0/telemetry/defaults-rally-node-0.io

I don't think self.car.safe_name (in this case defaults) provides value in the filename, in which case you don't even need to pass car in the constructor. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I was trying to be consistent with what we do fo gcstats and jfr and other telemetry files. So not sure whether to go for simplicity or consistency. What's your preference?

Copy link
Member

@danielmitterdorfer danielmitterdorfer Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contrary to other telemetry devices this one is not meant to be inspected by the user. Instead this is a temporary state file that should also be removed when the benchmark ends (i.e. in on_benchmark_stop). Hence, I'd opt for simplicity here.

Also, we should maybe use a different name than log_file here? It's not exactly a log file.

Finally, I think we should use os.path.join instead of / when creating paths.

console.info("%s: Writing start I/O stats to [%s]" % (self.human_name, log_file), logger=self.logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is only a temporary file that is necessary due to the way the start and stop subcommands will work, we should not inform the user about it.

if self.process_start:
read_bytes = self.process_start.read_bytes
write_bytes = self.process_start.write_bytes
self.logger.info("Using more accurate process-based I/O counters.")
else:
try:
self.disk_start = sysstats.disk_io_counters()
read_bytes = self.disk_start.read_bytes
write_bytes = self.disk_start.write_bytes
self.logger.warning("Process I/O counters are not supported on this platform. Falling back to less accurate disk "
"I/O counters.")
except RuntimeError:
self.logger.exception("Could not determine I/O stats at benchmark start.")
with open(log_file, "wt", encoding="utf-8") as f:
diskio_str = "%d %d" % (read_bytes, write_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered writing to a dict and dumping it as json instead? What you have here works perfectly, but I wonder if standardizing to json for all structures we have in Rally makes sense.

So here we'd have {"read_bytes": read_bytes, "write_bytes": write_bytes} and then on the next line it could be json.dump(diskio_str, f))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your json idea, I will change it to use that. Thanks!

f.write(diskio_str)

def on_benchmark_stop(self):
if self.process is not None:
process_end = sysstats.process_io_counters(self.process)
disk_end = sysstats.disk_io_counters()
io.ensure_dir(self.log_root)
log_file = "%s/%s-%s.io" % (self.log_root, self.car.safe_name, self.node.node_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use os.path.join instead of / here?

io_str = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

with open(log_file, "rt", encoding="utf-8") as f:
io_str = f.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my earlier suggestion whether we could json.load instead and use it as a dictionary without the need to split.

io_bytes = io_str.split()
# Be aware the semantics of write counts etc. are different for disk and process statistics.
# Thus we're conservative and only report I/O bytes now.
# noinspection PyBroadException
try:
# we have process-based disk counters, no need to worry how many nodes are on this host
if self.process_start:
process_end = sysstats.process_io_counters(self.process)
read_bytes = process_end.read_bytes - self.process_start.read_bytes
write_bytes = process_end.write_bytes - self.process_start.write_bytes
elif self.disk_start:
if process_end:
read_bytes = process_end.read_bytes - int(io_bytes[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json approach, as a benefit, can ensure type safety in a type unsafe language and you can get rid of the int conversions.

write_bytes = process_end.write_bytes - int(io_bytes[1])
elif disk_end:
if self.node_count_on_host > 1:
self.logger.info("There are [%d] nodes on this host and Rally fell back to disk I/O counters. Attributing [1/%d] "
"of total I/O to [%s].", self.node_count_on_host, self.node_count_on_host, self.node.node_name)

disk_end = sysstats.disk_io_counters()
read_bytes = (disk_end.read_bytes - self.disk_start.read_bytes) // self.node_count_on_host
write_bytes = (disk_end.write_bytes - self.disk_start.write_bytes) // self.node_count_on_host
read_bytes = (disk_end.read_bytes - int(io_bytes[0])) // self.node_count_on_host
write_bytes = (disk_end.write_bytes - int(io_bytes[1])) // self.node_count_on_host
else:
raise RuntimeError("Neither process nor disk I/O counters are available")

Expand Down