-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpluginbase.py
118 lines (93 loc) · 3.18 KB
/
pluginbase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import logging
import threading
import typing
from abc import ABC
from abc import abstractmethod
from ktoolbox import common
from tftbase import PluginOutput
from tftbase import TestMetadata
logger = logging.getLogger("tft." + __name__)
class Plugin(ABC):
PLUGIN_NAME = ""
@property
def log_name(self) -> str:
return f"plugin[{self.PLUGIN_NAME}"
def enable(
self,
*,
ts: "TestSettings",
perf_server: "ServerTask",
perf_client: "ClientTask",
tenant: bool,
) -> list["PluginTask"]:
tasks = self._enable(
ts=ts,
perf_server=perf_server,
perf_client=perf_client,
tenant=tenant,
)
logger.debug(f"{self.log_name}: enable ({len(tasks)} tasks, {tasks})")
return tasks
@abstractmethod
def _enable(
self,
*,
ts: "TestSettings",
perf_server: "ServerTask",
perf_client: "ClientTask",
tenant: bool,
) -> list["PluginTask"]:
pass
def eval_plugin_output(
self,
md: TestMetadata,
plugin_output: PluginOutput,
) -> PluginOutput:
if not plugin_output.eval_success:
logger.error(
f"{self.PLUGIN_NAME} plugin failed for {common.dataclass_to_json(md)}: {plugin_output.eval_msg}"
)
else:
logger.debug(
f"{self.PLUGIN_NAME} plugin succeded for {common.dataclass_to_json(md)}"
)
# Currently this doesn't really do anything additionally. We already evaluated
# for success.
return plugin_output
_plugin_registry_lock = threading.Lock()
_plugin_registry: dict[str, Plugin] = {}
def _get_plugin_registry() -> dict[str, Plugin]:
# Plugins self-register via pluginbase.register_plugin()
# when being imported. Ensure they are imported.
import pluginMeasureCpu # noqa: F401
import pluginMeasurePower # noqa: F401
import pluginValidateOffload # noqa: F401
return _plugin_registry
def register_plugin(plugin: Plugin) -> Plugin:
name = plugin.PLUGIN_NAME
with _plugin_registry_lock:
p2 = _plugin_registry.setdefault(name, plugin)
if p2 is not plugin:
raise ValueError(f"plugin {repr(name)} is already registered")
return plugin
def get_all() -> list[Plugin]:
registry = _get_plugin_registry()
with _plugin_registry_lock:
return [registry[k] for k in sorted(registry)]
def get_by_name(plugin_name: str) -> Plugin:
registry = _get_plugin_registry()
with _plugin_registry_lock:
plugin = registry.get(plugin_name)
if plugin is None:
raise ValueError(f'Plugin "{plugin_name}" does not exist')
return plugin
if typing.TYPE_CHECKING:
# "pluginbase" cannot import modules like perf, task or testSettings, because
# those modules import "pluginbase" in turn. However, to forward declare
# type annotations, we do need those module here. Import them with
# TYPE_CHECKING, but otherwise avoid the cyclic dependency between
# modules.
from task import ClientTask
from task import ServerTask
from task import PluginTask
from testSettings import TestSettings