Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kubernetes connector refactor #462

Merged
merged 26 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0572d4d
WIP check in of kubernetes connector refactor
linkous8 Sep 17, 2022
ada71ed
Resolve circular import, refactor connectors
linkous8 Sep 19, 2022
9c15775
Refactor inject_sidecar, iterate on unit test fail
linkous8 Sep 20, 2022
8860e00
Iterate unit test failures
linkous8 Sep 20, 2022
94cb8d4
Iterate failing unit tests
linkous8 Sep 20, 2022
092535d
Add xfail for intermittent minikube metrics server
linkous8 Sep 20, 2022
3faa70f
Iterate unit tests, k8s cleanup
linkous8 Sep 21, 2022
ec89ff4
Remove flakey minikube testing
linkous8 Sep 22, 2022
b6364d1
Merge branch 'main' into feature/kubernetes-connector-refactor
linkous8 Sep 22, 2022
a414d56
Iterate integration tests
linkous8 Sep 22, 2022
74be21b
Iterate integration tests
linkous8 Sep 23, 2022
0461fe5
Fix replicaset match_labels missing underscore
linkous8 Sep 23, 2022
4c059b4
Iterate integration tests
linkous8 Sep 23, 2022
1569e7a
Fix opsani_dev test helpers looking for metadata
linkous8 Sep 23, 2022
1466f95
Fix opsani dev tests: refresh deployment before
linkous8 Sep 26, 2022
a89ffca
Iterate tests: add unclosed file ignore for pytest
linkous8 Sep 26, 2022
988931d
Iterate integration tests
linkous8 Sep 26, 2022
e498c81
Iterate integration tests
linkous8 Sep 26, 2022
6fd20cf
Fix inject_sidecar, add logic to return empty list
linkous8 Sep 27, 2022
ab87147
Fix kube_metrics bad copy paste
linkous8 Sep 27, 2022
84574a8
Attempt to stabilize flakey integration test
linkous8 Sep 27, 2022
3941389
Fix incorrect validation on statefulset config
linkous8 Sep 29, 2022
f6ee88f
Bump aggressive test timeout
linkous8 Sep 29, 2022
78bdd5b
Address PR feedback
linkous8 Oct 5, 2022
b4e36d6
Rename selector utility for increased clarity
linkous8 Oct 5, 2022
d65dc66
Handle conflict response in inject_sidecar
linkous8 Oct 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
- id: skip_check
uses: fkirc/skip-duplicate-actions@v4
uses: fkirc/skip-duplicate-actions@master
with:
concurrent_skipping: 'same_content'
paths_ignore: '["**/README.md", "**/docs/**", "CHANGELOG.md"]'
Expand Down
28 changes: 9 additions & 19 deletions servo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import click
import devtools
import kubernetes_asyncio
import kubernetes_asyncio.config
import loguru
import pydantic
import pygments
Expand All @@ -44,6 +45,8 @@
import servo.utilities.yaml
import servo.utilities.strings

from servo.connectors.kubernetes_helpers import DeploymentHelper


class Section(str, enum.Enum):
assembly = "Assembly Commands"
Expand Down Expand Up @@ -1557,34 +1560,21 @@ def inject_sidecar(

if target.startswith("deploy"):
deployment = run_async(
servo.connectors.kubernetes.Deployment.read(
target.split("/", 1)[1], namespace
)
DeploymentHelper.read(target.split("/", 1)[1], namespace)
)
run_async(
deployment.inject_sidecar(
"opsani-envoy", image, service=service, port=port
DeploymentHelper.inject_sidecar(
deployment, "opsani-envoy", image, service=service, port=port
)
)
typer.echo(
f"Envoy sidecar injected to Deployment {deployment.name} in {namespace}"
f"Envoy sidecar injected to Deployment {deployment.metadata.name} in {namespace}"
)

elif target.startswith("rollout"):
rollout = run_async(
servo.connectors.kubernetes.Rollout.read(
target.split("/", 1)[1], namespace
)
)
run_async(
rollout.inject_sidecar(
"opsani-envoy", image, service=service, port=port
)
)
typer.echo(
f"Envoy sidecar injected to Rollout {rollout.name} in {namespace}"
raise typer.BadParameter(
"Rollout sidecar injection is not yet implemented"
)

elif target.startswith("pod"):
raise typer.BadParameter("Pod sidecar injection is not yet implemented")
else:
Expand Down
172 changes: 89 additions & 83 deletions servo/connectors/kube_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

import servo
from servo.checks import CheckError
from servo.connectors.kubernetes_helpers import (
dict_to_selector,
find_container,
get_containers,
ContainerHelper,
DeploymentHelper,
PodHelper,
StatefulSetHelper,
)
from servo.connectors.kubernetes import (
Container,
Deployment,
DNSSubdomainName,
Core,
PermissionSet,
Pod,
ResourceRequirement,
Rollout,
selector_string,
ShortByteSize,
)
from servo.types import DataPoint, Metric, TimeSeries
import servo.types
from servo.types import DataPoint, Metric, TimeSeries, Resource, ResourceRequirement

import kubernetes_asyncio.client
from kubernetes_asyncio.client import V1Container, V1Deployment, V1Pod, V1StatefulSet
import kubernetes_asyncio.client.api_client
import kubernetes_asyncio.client.exceptions
import kubernetes_asyncio.config
Expand Down Expand Up @@ -84,8 +89,10 @@ class KubeMetricsConfiguration(servo.BaseConfiguration):
description="Namespace of the target resource"
)
name: str = pydantic.Field(description="Name of the target resource")
kind: pydantic.constr(regex=r"^([Dd]eployment|[Rr]ollout)$") = pydantic.Field(
default="Deployment", description="Kind of the target resource"
kind: str = pydantic.Field(
default="Deployment",
description="Kind of the target resource",
regex=r"^([Dd]eployment|[Ss]tateful[Ss]et)$",
)
container: Optional[str] = pydantic.Field(
default=None, description="Name of the target resource container"
Expand Down Expand Up @@ -141,22 +148,18 @@ async def check_metrics_api_permissions(self) -> None:
for permission in KUBERNETES_PERMISSIONS:
for resource in permission.resources:
for verb in permission.verbs:
attributes = (
kubernetes_asyncio.client.models.V1ResourceAttributes(
namespace=self.config.namespace,
group=permission.group,
resource=resource,
verb=verb,
)
attributes = kubernetes_asyncio.client.V1ResourceAttributes(
namespace=self.config.namespace,
group=permission.group,
resource=resource,
verb=verb,
)

spec = kubernetes_asyncio.client.models.V1SelfSubjectAccessReviewSpec(
spec = kubernetes_asyncio.client.V1SelfSubjectAccessReviewSpec(
resource_attributes=attributes
)
review = (
kubernetes_asyncio.client.models.V1SelfSubjectAccessReview(
spec=spec
)
review = kubernetes_asyncio.client.V1SelfSubjectAccessReview(
spec=spec
)
access_review = await v1.create_self_subject_access_review(
body=review
Expand All @@ -171,7 +174,9 @@ async def check_metrics_api(self) -> None:
async with kubernetes_asyncio.client.api_client.ApiClient() as api:
cust_obj_api = kubernetes_asyncio.client.CustomObjectsApi(api_client=api)
await cust_obj_api.list_namespaced_custom_object(
label_selector=selector_string(target_resource.match_labels),
label_selector=dict_to_selector(
target_resource.spec.selector.match_labels
),
namespace=self.config.namespace,
**METRICS_CUSTOM_OJBECT_CONST_ARGS,
)
Expand All @@ -181,17 +186,13 @@ async def check_target_containers(self) -> None:
target_resource = await _get_target_resource(self.config)
if self.config.container:
assert (
next(
(
c
for c in target_resource.containers
if c.name == self.config.container
),
None,
)
find_container(workload=target_resource, name=self.config.container)
is not None
), f"Configured container {self.config.container} was not found in target app containers ({', '.join((c.name for c in target_resource.containers))})"
elif len(target_resource.containers) > 1:
), (
f"Configured container {self.config.container} was not found in target app containers"
f" ({', '.join((c.name for c in get_containers(workload=target_resource)))})"
)
elif len(get_containers(workload=target_resource)) > 1:
raise CheckError(
"Container name must be configured for target application with multiple containers"
)
Expand Down Expand Up @@ -261,7 +262,6 @@ async def measure(
target_metrics = [
m for m in self.config.metrics_to_collect if m.value in metrics
]
target_resource = await _get_target_resource(self.config)

progress_duration = servo.Duration(control.warmup + control.duration)
progress = servo.EventProgress(timeout=progress_duration)
Expand All @@ -286,7 +286,6 @@ async def measure(

try:
await self.periodic_measure(
target_resource=target_resource,
target_metrics=target_metrics,
datapoints_dicts=datapoints_dicts,
)
Expand Down Expand Up @@ -353,19 +352,20 @@ def _get_target_container_metrics(

async def periodic_measure(
self,
target_resource: Union[Deployment, Rollout],
target_metrics: list[SupportedKubeMetrics],
datapoints_dicts: Dict[str, Dict[str, List[DataPoint]]],
) -> None:
# Retrieve latest main state
await target_resource.refresh()
target_resource = await _get_target_resource(self.config)
target_resource_container = _get_target_resource_container(
self.config, target_resource
)

async with kubernetes_asyncio.client.api_client.ApiClient() as api:
cust_obj_api = kubernetes_asyncio.client.CustomObjectsApi(api_client=api)
label_selector_str = selector_string(target_resource.match_labels)
label_selector_str = dict_to_selector(
target_resource.spec.selector.match_labels
)
timestamp = datetime.now()

if any((m in MAIN_METRICS_REQUIRE_CUST_OBJ for m in target_metrics)):
Expand Down Expand Up @@ -404,8 +404,8 @@ async def periodic_measure(
value=mem_usage,
)

cpu_resources = target_resource_container.get_resource_requirements(
"cpu"
cpu_resources = ContainerHelper.get_resource_requirements(
target_resource_container, Resource.cpu.value
)
# Set requests = limits if not specified
if (
Expand Down Expand Up @@ -441,8 +441,8 @@ async def periodic_measure(
value=cpu_saturation,
)

mem_resources = target_resource_container.get_resource_requirements(
"memory"
mem_resources = ContainerHelper.get_resource_requirements(
target_resource_container, Resource.memory.value
)
# Set requests = limits if not specified
if (
Expand Down Expand Up @@ -486,40 +486,45 @@ async def periodic_measure(
datapoints_dicts=datapoints_dicts,
time=timestamp,
)
for pod in await target_resource.get_pods():
target_pods = [
pod
for pod in await PodHelper.list_pods_with_labels(
target_resource.metadata.namespace,
target_resource.spec.selector.match_labels,
)
if "tuning" not in pod.metadata.name
]
for pod in target_pods:
_append_data_point_for_time(
pod_name=pod.name,
pod_name=pod.metadata.name,
metric_name=SupportedKubeMetrics.MAIN_POD_RESTART_COUNT.value,
value=pod.restart_count,
value=PodHelper.get_restart_count(pod),
)

# Retrieve latest tuning state
target_resource_tuning_pod_name = f"{target_resource.name}-tuning"
target_resource_tuning_pod: Pod = next(
(
p
for p in await target_resource.get_pods()
if p.name == target_resource_tuning_pod_name
),
None,
)
target_resource_tuning_pod_name = f"{target_resource.metadata.name}-tuning"
try:
target_resource_tuning_pod = await PodHelper.read(
target_resource_tuning_pod_name, target_resource.metadata.namespace
)
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status != 404 or e.reason != "Not Found":
raise
target_resource_tuning_pod = None

if target_resource_tuning_pod:
target_resource_tuning_pod_container = _get_target_resource_container(
self.config, target_resource_tuning_pod
)
cpu_resources = (
target_resource_tuning_pod_container.get_resource_requirements(
"cpu"
)
cpu_resources = ContainerHelper.get_resource_requirements(
target_resource_tuning_pod_container, Resource.cpu.value
)
# Set requests = limits if not specified
if (cpu_request := cpu_resources[ResourceRequirement.request]) is None:
cpu_request = cpu_resources[ResourceRequirement.limit]

mem_resources = (
target_resource_tuning_pod_container.get_resource_requirements(
"memory"
)
mem_resources = ContainerHelper.get_resource_requirements(
target_resource_tuning_pod_container, Resource.memory.value
)
if (mem_request := mem_resources[ResourceRequirement.request]) is None:
mem_request = mem_resources[ResourceRequirement.limit]
Expand All @@ -537,7 +542,9 @@ async def periodic_measure(
restart_count = None
if SupportedKubeMetrics.TUNING_POD_RESTART_COUNT in target_metrics:
if target_resource_tuning_pod is not None:
restart_count = target_resource_tuning_pod.restart_count
restart_count = PodHelper.get_restart_count(
target_resource_tuning_pod
)
else:
restart_count = 0

Expand All @@ -550,7 +557,7 @@ async def periodic_measure(
# TODO: (potential improvement) raise error if more than 1 tuning pod?
for pod_entry in tuning_metrics["items"]:
pod_name = pod_entry["metadata"]["name"]
if pod_name != f"{target_resource.name}-tuning":
if pod_name != target_resource_tuning_pod_name:
raise RuntimeError(f"Got unexpected tuning pod name {pod_name}")
timestamp = isoparse(pod_entry["timestamp"])
_append_data_point_for_pod = functools.partial(
Expand Down Expand Up @@ -669,40 +676,39 @@ def _append_data_point(

async def _get_target_resource(
config: KubeMetricsConfiguration,
) -> Union[Deployment, Rollout]:
) -> Union[V1Deployment, V1StatefulSet]:
read_args = dict(name=config.name, namespace=config.namespace)
if config.kind.lower() == "deployment":
return await Deployment.read(**read_args)
elif config.kind.lower() == "rollout":
return await Rollout.read(**read_args)
return await DeploymentHelper.read(**read_args)
elif config.kind.lower() == "statefulset":
return await StatefulSetHelper.read(**read_args)
else:
raise NotImplementedError(
f"Resource type {config.kind} is not supported by the kube-metrics connector"
)


def _get_target_resource_container(
config: KubeMetricsConfiguration, target_resource: Union[Deployment, Rollout, Pod]
) -> Container:
config: KubeMetricsConfiguration,
target_resource: Union[V1Deployment, V1StatefulSet, V1Pod],
) -> V1Container:
if config.container:
if isinstance(target_resource, Pod):
target_resource_container: Container = target_resource.get_container(
config.container
)
else:
target_resource_container: Container = target_resource.find_container(
config.container
)

target_resource_container = find_container(
workload=target_resource, name=config.container
)
if target_resource_container is None:
raise RuntimeError(
f"Unable to locate container {config.container} in {target_resource.obj.kind} {target_resource.name}"
f"Unable to locate container {config.container} in {target_resource.kind} {target_resource.metadata.name}"
)
elif len(target_resource.containers) > 1:
# TODO (improvement) can support this with ID append
raise RuntimeError(f"Unable to derive metrics for multi-container resources")
else:
target_resource_container: Container = target_resource.containers[0]
containers = get_containers(workload=target_resource)
# TODO (improvement) can support this with ID append
if len(containers) > 1:
raise RuntimeError(
f"Unable to derive metrics for multi-container resources"
)

target_resource_container = containers[0]

return target_resource_container

Expand Down
Loading