forked from facebookincubator/senpai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsenpai.py
184 lines (151 loc) · 6.19 KB
/
senpai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
#
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.
import argparse
import datetime
import math
import time
import sys
import os
def h(x):
"""Translates a number of bytes to a human-readable string."""
order = 0
suffix = ['', 'k', 'M', 'G', 'T']
max_order = len(suffix) - 1
while abs(x) > 1024 and order < max_order:
x /= 1024.0
order += 1
return '%.2f%s' % (x, suffix[order])
def log(string):
"""Logs timestamped information to stdout."""
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(ts + ' ' + string)
class Cgroup(object):
def __init__(self, path, limit_min, limit_max):
self.path = path
self.limit = None
self.limit_min = limit_min
self.limit_max = limit_max
self.pressure() # Ensure psi is available in Linux 4.20+
self.set_limit(None)
# Parsing the memory pressure file format:
#
# some avg10=0.00 avg60=0.00 avg300=0.00 total=0
# full avg10=0.00 avg60=0.00 avg300=0.00 total=0
def pressure(self):
return float(self.readlines("memory.pressure")[0].split()[1].split('=')[1])
def total(self):
return int(self.readlines("memory.pressure")[0].split()[4].split('=')[1])
# Pressure adjustments
def scale_limit(self, factor):
x = self.read_limit()
x = int(x + x * factor)
self.set_limit(x)
def set_limit(self, limit):
self.import_limit(limit)
self.write("memory.high", str(self.limit))
def import_limit(self, limit):
"""Sanitize limit parameter and store it in self.limit."""
# Init or race condition? Reset limit to memory.current
if self.read_limit() != self.limit:
log('Resetting limit to memory.current.')
limit = self.read_current()
limit = max(self.limit_min, min(self.limit_max, limit))
limit &= ~4095
self.limit = limit
# Cgroupfs IO
def read_current(self):
return int(self.read("memory.current"))
def read_limit(self):
x = self.read("memory.high")
if x == "max\n":
return (1 << 64) - 1
return int(x)
def read(self, filename):
with open(os.path.join(self.path, filename)) as f:
return f.read()
def readlines(self, filename):
with open(os.path.join(self.path, filename)) as f:
return f.readlines()
def write(self, filename, string):
with open(os.path.join(self.path, filename), 'w') as f:
f.write(string)
class Senpai(object):
def __init__(self, conf):
self.conf = conf
log('Configuration:')
for key, val in vars(conf).items():
log(f' {key} = {val}')
self.cgroup = Cgroup(self.conf.cgpath,
self.conf.min_size,
self.conf.max_size)
self.last_total = self.cgroup.total()
self.integral = 0
self.grace_ticks = self.conf.interval
def run(self):
while True:
time.sleep(1)
self.tick()
def tick(self):
total = self.cgroup.total()
delta = total - self.last_total
self.last_total = total
self.integral += delta
log('limit=%s pressure=%f time_to_probe=%2d total=%d delta=%d integral=%d' %
(h(self.cgroup.read_limit()), self.cgroup.pressure(),
self.grace_ticks, total, delta, self.integral))
if self.integral > self.conf.pressure:
# Back off exponentially as we deviate from the target
# pressure. The backoff coefficient defines how sensitive
# we are to fluctuations around the target pressure: when
# the coefficient is 10, the curve reaches the adjustment
# limit when pressure is ten times the target pressure.
err = self.integral / self.conf.pressure
adj = (err / self.conf.coeff_backoff) ** 2
adj = min(adj * self.conf.max_backoff, self.conf.max_backoff)
self.adjust(adj)
self.grace_ticks = self.conf.interval - 1
return
if self.grace_ticks:
self.grace_ticks -= 1
return
# Tighten the limit. Like when backing off, the adjustment
# becomes exponentially more aggressive as observed pressure
# falls below the target pressure and reaches the adjustment
# limit when pressure is 1/COEFF_PROBE of target pressure.
err = self.conf.pressure / max(self.integral, 1)
adj = (err / self.conf.coeff_probe) ** 2
adj = min(adj * self.conf.max_probe, self.conf.max_probe)
self.adjust(-adj)
self.grace_ticks = self.conf.interval - 1
def adjust(self, factor):
log(f' adjust: {factor}')
self.cgroup.scale_limit(factor)
self.integral = 0
parser = argparse.ArgumentParser(description="""
Senpai takes a cgroup and dynamically adjusts its memory range
between MIN_SIZE and MAX_SIZE using psi memory pressure data.
Senpai targets cumulative memory delays of PRESSURE microseconds over
the sampling period of INTERVAL seconds. If observed pressure exceeds
this target, senpai will losen the limit, otherwise tighten it.
Corrective action scales exponentially with the error between observed
pressure and target pressure, and is bounded by MAX_PROBE, MAX_BACKOFF.
These maximums are reached when observed pressure is at 1/COEFF_PROBE
of target pressure, or COEFF_BACKOFF times target pressure, respectively.
""",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('cgpath', type=str)
parser.add_argument('--min-size', type=int, default=100 << 20)
parser.add_argument('--max-size', type=int, default=100 << 30)
parser.add_argument('--interval', type=int, default=6)
parser.add_argument('--pressure', type=int, default=10*1000)
parser.add_argument('--max-probe', type=float, default=0.01)
parser.add_argument('--max-backoff', type=float, default=1.0)
parser.add_argument('--coeff-probe', type=int, default=10)
parser.add_argument('--coeff-backoff', type=int, default=20)
conf = parser.parse_args()
senpai = Senpai(conf)
senpai.run()