-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathminesweeper.py
executable file
·442 lines (365 loc) · 12.8 KB
/
minesweeper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
#!/usr/bin/env python3
"""
minesweeper script
Continuous process, on each node via DaemonSet,
to identify processes that could be considered for termination:
- determine which processes are "suspicious" (see herorat.py)
- produce report on suspicious pods:
- show running processes (`ps aux`)
- tail pod logs
- automatically terminate pods likely to be abuse, etc.
"""
import asyncio
import copy
import glob
import json
import os
import pprint
import re
import signal
import socket
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from operator import attrgetter
from textwrap import indent
# herorat located in secrets/minesweeper/
import herorat
import kubernetes.client
import kubernetes.config
import psutil
from herorat import inspect_pod, inspect_process
from kubernetes.stream import stream
kubernetes.config.load_incluster_config()
kube = kubernetes.client.CoreV1Api()
local = threading.local()
config = {}
hostname = os.environ.get("NODE_NAME", socket.gethostname())
default_config = {
"userid": 1000,
"inspect_procs_without_pod": False,
"inspect_dind": True,
"threads": 8,
"interval": 300,
"namespace": os.environ.get("NAMESPACE", "default"),
"pod_selectors": {
"label_selector": "component=singleuser-server",
"field_selector": f"spec.nodeName={hostname}",
},
"log_tail_lines": 100,
# process attributes to retrieve
# see psutil.as_dict docs for available fields:
# https://psutil.readthedocs.io/en/latest/#psutil.Process.as_dict
"proc_attrs": [
"cmdline",
"cpu_percent",
"cpu_times",
"exe",
"memory_info",
"name",
"pid",
"ppid",
"status",
"uids",
],
# 8 hour max age, cleanup after culler
"pod_max_age_seconds": 0,
}
default_config.update(herorat.default_config)
def get_kube():
"""Get thread-local kubernetes client
kubernetes client objects aren't threadsafe, I guess
"""
if not hasattr(local, "kube"):
local.kube = kubernetes.client.CoreV1Api()
return local.kube
class Proc(dict):
"""Proc is a dict subclass with attribute-access for keys
suspicious and should_terminate are added via inspection.
They can be booleans or truthy strings explaining
why they are suspicious or should be terminated.
"""
def __init__(self, **kwargs):
kwargs.setdefault("suspicious", False)
kwargs.setdefault("should_terminate", False)
super().__init__(**kwargs)
# secondary derived fields
# cmd is the command-line as a single string
self["cmd"] = " ".join(self["cmdline"])
# cpu_total is the sum of cpu times (user, system, children, etc.)
self["cpu_total"] = sum(kwargs.get("cpu_times", []))
def __repr__(self):
key_fields = ", ".join(
[
f"{key}={self.get(key)}"
for key in [
"pid",
"status",
"suspicious",
"should_terminate",
"cmd",
]
if self.get(key) is not None
]
)
return f"{self.__class__.__name__}({key_fields})"
def __getattr__(self, key):
return self[key]
def __setattr__(self, key, value):
self[key] = value
def get_procs(userid):
"""Get all container processes running with a given user id"""
procs = []
for p in psutil.process_iter(attrs=config["proc_attrs"]):
# TODO: should we filter to userid?
# For now: skip userid filtering, because we
# want to catch all processes in pods, even if they
# ran setuid
# if p.info["uids"].real != userid:
# continue
if not p.info["cmdline"]:
# ignore empty commands, e.g. kernel processes
continue
proc = Proc(**p.info)
procs.append(proc)
procs = sorted(procs, key=attrgetter("cpu_percent"), reverse=True)
return procs
def get_pods():
"""Get all the pods in our namespace"""
kube = get_kube()
namespace = config["namespace"]
# _preload_content=False doesn't even json-parse list results??
resp = kube.list_namespaced_pod(
namespace,
_preload_content=False,
**config["pod_selectors"],
)
return json.loads(resp.read().decode("utf8"))["items"]
def pods_by_uid(pods):
"""Construct a dict of pods, keyed by pod uid"""
return {pod["metadata"]["uid"]: pod for pod in pods}
def get_all_pod_uids():
"""Return mapping of pid to pod uid"""
pod_uids = {}
for cgroup_file in glob.glob("/proc/[0-9]*/cgroup"):
pid = int(cgroup_file.split("/")[-2])
try:
with open(cgroup_file) as f:
cgroups = f.read()
except FileNotFoundError:
# process deleted, ignore
continue
m = re.search(r"[/-]pod([^/\.]+)", cgroups)
if m is None:
# not a pod proc
continue
pod_uids[pid] = m.group(1).replace("_", "-")
return pod_uids
def get_dind_procs():
"""Return list of dind container processes
Identified by cgroup
"""
procs = []
for cgroup_file in glob.glob("/proc/[0-9]*/cgroup"):
pid = int(cgroup_file.split("/")[-2])
try:
with open(cgroup_file) as f:
cgroups = f.read()
except FileNotFoundError:
# process deleted, ignore
continue
# the dind-created cgroups for build containers
# are nested under an extra /docker/ level below the dind pod's own cgroup
# dind pod itself: /kubepods/burstable/pod{u-u-i-d}/{abc123}
# container run by dind: {dind_pod_cgroup}/docker/{def456}
m = re.search("/pod[^/]+/[^/]+/docker/(.+)", cgroups)
if m is None:
# not a dind proc
continue
try:
proc_dict = psutil.Process(pid).as_dict(config["proc_attrs"])
except psutil.NoSuchProcess:
pass
procs.append(Proc(**proc_dict))
return procs
def associate_pods_procs(pods, procs):
"""Associate pods and processes
For all pods, defines pod["minesweeper"]["procs"] = list_of_procs_in_pod
Returns (pods, procs_without_pods)
"""
for pod in pods.values():
pod["minesweeper"] = {
"procs": [],
}
procs_without_pods = []
pod_uids = get_all_pod_uids()
for proc in procs:
pod_uid = pod_uids.get(proc.pid)
pod = pods.get(pod_uid)
if not pod:
procs_without_pods.append(proc)
else:
pod["minesweeper"]["procs"].append(proc)
return pods, procs_without_pods
def ps_pod(pod, userid=1000):
"""Get ps output from a single pod"""
kube = get_kube()
try:
client = stream(
kube.connect_get_namespaced_pod_exec,
pod["metadata"]["name"],
namespace=pod["metadata"]["namespace"],
command=["ps", "aux"],
stderr=True,
stdin=False,
stdout=True,
_preload_content=False,
)
client.run_forever(timeout=60)
stderr = client.read_stderr()
if stderr.strip():
print(f"err! {stderr}", file=sys.stderr)
stdout = client.read_stdout()
returncode = client.returncode
if returncode:
raise RuntimeError(f"stdout={stdout}\nstderr={stderr}")
return stdout
except Exception as e:
return f"Error reporting on ps in {pod['metadata']['name']}: {e}"
def log_pod(pod):
"""Return the logs for a suspicious pod"""
kube = get_kube()
try:
return kube.read_namespaced_pod_log(
pod["metadata"]["name"],
namespace=pod["metadata"]["namespace"],
tail_lines=config["log_tail_lines"],
)
except Exception as e:
return f"Error collecting logs for {pod['metadata']['name']}: {e}"
async def report_pod(pod):
"""Produce a report on a single pod"""
pod_name = pod["metadata"]["name"]
ps_future = in_pool(lambda: ps_pod(pod))
logs_future = in_pool(lambda: log_pod(pod))
ps, logs = await asyncio.gather(ps_future, logs_future)
print(
"\n".join(
[
pod_name,
f"ps {pod_name}:",
indent(ps, " "),
f"logs {pod_name}:",
indent(logs, " "),
]
)
)
def terminate_pod(pod):
"""Call in a thread to terminate a pod"""
namespace = pod["metadata"]["namespace"]
name = pod["metadata"]["name"]
print(f"Deleting pod {name}")
kube = get_kube()
kube.delete_namespaced_pod(name=name, namespace=namespace)
async def node_report(pods=None, userid=1000):
"""Print a report of suspicious processes on a single node"""
if pods is None:
pods = pods_by_uid(await in_pool(get_pods))
procs = await in_pool(lambda: get_procs(userid))
print(f"Total processes for {hostname}: {len(procs)}\n", end="")
pods, procs_without_pod = associate_pods_procs(pods, procs)
# inspect all procs in our pods
user_procs = []
for pod in pods.values():
user_procs.extend(pod["minesweeper"]["procs"])
pod["minesweeper"]["procs"] = [
inspect_process(p) for p in pod["minesweeper"]["procs"]
]
print(f"Total user pods for {hostname}: {len(pods)}\n", end="")
print(f"Total user processes for {hostname}: {len(user_procs)}\n", end="")
suspicious_pods = [pod for pod in pods.values() if inspect_pod(pod)["suspicious"]]
print(f"Pods of interest for {hostname}: {len(suspicious_pods)}")
# report on all suspicious pods
report_futures = []
for pod in suspicious_pods:
fut = asyncio.ensure_future(report_pod(pod))
report_futures.append(fut)
await asyncio.sleep(0)
# report on suspicious processes with no matching pod
suspicious_procs_without_pod = []
if config["inspect_procs_without_pod"]:
procs_without_pod = [inspect_process(p) for p in procs_without_pod]
suspicious_procs_without_pod = [p for p in procs_without_pod if p.suspicious]
if suspicious_procs_without_pod:
print(
f"No pods found for {len(suspicious_procs_without_pod)} suspicious processes on {hostname}:"
)
for proc in suspicious_procs_without_pod:
print(f" {proc.pid}: {proc.cmd}")
# report on suspicious dind processes
if config["inspect_dind"]:
dind_procs = [inspect_process(p) for p in get_dind_procs()]
print(f"Total dind processes for {hostname}: {len(dind_procs)}")
for proc in dind_procs:
if proc.should_terminate:
print(f"dind process should terminate: {proc}")
try:
os.kill(proc.pid, signal.SIGKILL)
except OSError as e:
print(f"Failed to kill {proc}: {e}")
elif proc.suspicious:
print(f"dind process is suspicious: {proc}")
# TODO: find a way to identity the build repo responsible for suspicious processes in dind
# suspicious_dind_procs_without_pod = [
# p for p in procs_without_pod if p.suspicious
# ]
if report_futures:
await asyncio.gather(*report_futures)
# finally, terminate pods that meet the immediate termination condition
pods_to_terminate = [
pod for pod in pods.values() if pod["minesweeper"]["should_terminate"]
]
if pods_to_terminate:
terminate_futures = [
in_pool(partial(terminate_pod, pod)) for pod in pods_to_terminate
]
await asyncio.gather(*terminate_futures)
def get_pool(n=None):
"""Get the global thread pool executor"""
if get_pool._pool is None:
get_pool._pool = ThreadPoolExecutor(config["threads"])
return get_pool._pool
get_pool._pool = None
def in_pool(func):
f = get_pool().submit(func)
return asyncio.wrap_future(f)
def load_config():
"""load config from mounted config map
may change during run, so reload from file each time
"""
global config
prior_config = copy.deepcopy(config)
config.update(default_config)
config_file = "/etc/minesweeper/minesweeper.json"
if os.path.isfile(config_file):
with open(config_file) as f:
file_config = json.load(f)
config.update(file_config)
# sync global config with herorat
herorat.config = config
else:
print(f"No such file: {config_file}")
if config != prior_config:
print("Loaded config:")
pprint.pprint(config)
return config
async def main():
"""Main entrypoint: run node_report periodically forever"""
while True:
# reload since configmap can change
load_config()
await node_report(userid=config["userid"])
await asyncio.sleep(config["interval"])
if __name__ == "__main__":
asyncio.run(main())