Skip to content

Commit

Permalink
Merge pull request #462 from opsani/feature/kubernetes-connector-refa…
Browse files Browse the repository at this point in the history
…ctor

Feature/kubernetes connector refactor
  • Loading branch information
linkous8 authored Oct 5, 2022
2 parents 74ff311 + d65dc66 commit af53d05
Show file tree
Hide file tree
Showing 24 changed files with 2,593 additions and 5,715 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
- id: skip_check
uses: fkirc/skip-duplicate-actions@v4
uses: fkirc/skip-duplicate-actions@master
with:
concurrent_skipping: 'same_content'
paths_ignore: '["**/README.md", "**/docs/**", "CHANGELOG.md"]'
Expand Down
28 changes: 9 additions & 19 deletions servo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import click
import devtools
import kubernetes_asyncio
import kubernetes_asyncio.config
import loguru
import pydantic
import pygments
Expand All @@ -44,6 +45,8 @@
import servo.utilities.yaml
import servo.utilities.strings

from servo.connectors.kubernetes_helpers import DeploymentHelper


class Section(str, enum.Enum):
assembly = "Assembly Commands"
Expand Down Expand Up @@ -1557,34 +1560,21 @@ def inject_sidecar(

if target.startswith("deploy"):
deployment = run_async(
servo.connectors.kubernetes.Deployment.read(
target.split("/", 1)[1], namespace
)
DeploymentHelper.read(target.split("/", 1)[1], namespace)
)
run_async(
deployment.inject_sidecar(
"opsani-envoy", image, service=service, port=port
DeploymentHelper.inject_sidecar(
deployment, "opsani-envoy", image, service=service, port=port
)
)
typer.echo(
f"Envoy sidecar injected to Deployment {deployment.name} in {namespace}"
f"Envoy sidecar injected to Deployment {deployment.metadata.name} in {namespace}"
)

elif target.startswith("rollout"):
rollout = run_async(
servo.connectors.kubernetes.Rollout.read(
target.split("/", 1)[1], namespace
)
)
run_async(
rollout.inject_sidecar(
"opsani-envoy", image, service=service, port=port
)
)
typer.echo(
f"Envoy sidecar injected to Rollout {rollout.name} in {namespace}"
raise typer.BadParameter(
"Rollout sidecar injection is not yet implemented"
)

elif target.startswith("pod"):
raise typer.BadParameter("Pod sidecar injection is not yet implemented")
else:
Expand Down
172 changes: 89 additions & 83 deletions servo/connectors/kube_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

import servo
from servo.checks import CheckError
from servo.connectors.kubernetes_helpers import (
dict_to_selector,
find_container,
get_containers,
ContainerHelper,
DeploymentHelper,
PodHelper,
StatefulSetHelper,
)
from servo.connectors.kubernetes import (
Container,
Deployment,
DNSSubdomainName,
Core,
PermissionSet,
Pod,
ResourceRequirement,
Rollout,
selector_string,
ShortByteSize,
)
from servo.types import DataPoint, Metric, TimeSeries
import servo.types
from servo.types import DataPoint, Metric, TimeSeries, Resource, ResourceRequirement

import kubernetes_asyncio.client
from kubernetes_asyncio.client import V1Container, V1Deployment, V1Pod, V1StatefulSet
import kubernetes_asyncio.client.api_client
import kubernetes_asyncio.client.exceptions
import kubernetes_asyncio.config
Expand Down Expand Up @@ -84,8 +89,10 @@ class KubeMetricsConfiguration(servo.BaseConfiguration):
description="Namespace of the target resource"
)
name: str = pydantic.Field(description="Name of the target resource")
kind: pydantic.constr(regex=r"^([Dd]eployment|[Rr]ollout)$") = pydantic.Field(
default="Deployment", description="Kind of the target resource"
kind: str = pydantic.Field(
default="Deployment",
description="Kind of the target resource",
regex=r"^([Dd]eployment|[Ss]tateful[Ss]et)$",
)
container: Optional[str] = pydantic.Field(
default=None, description="Name of the target resource container"
Expand Down Expand Up @@ -141,22 +148,18 @@ async def check_metrics_api_permissions(self) -> None:
for permission in KUBERNETES_PERMISSIONS:
for resource in permission.resources:
for verb in permission.verbs:
attributes = (
kubernetes_asyncio.client.models.V1ResourceAttributes(
namespace=self.config.namespace,
group=permission.group,
resource=resource,
verb=verb,
)
attributes = kubernetes_asyncio.client.V1ResourceAttributes(
namespace=self.config.namespace,
group=permission.group,
resource=resource,
verb=verb,
)

spec = kubernetes_asyncio.client.models.V1SelfSubjectAccessReviewSpec(
spec = kubernetes_asyncio.client.V1SelfSubjectAccessReviewSpec(
resource_attributes=attributes
)
review = (
kubernetes_asyncio.client.models.V1SelfSubjectAccessReview(
spec=spec
)
review = kubernetes_asyncio.client.V1SelfSubjectAccessReview(
spec=spec
)
access_review = await v1.create_self_subject_access_review(
body=review
Expand All @@ -171,7 +174,9 @@ async def check_metrics_api(self) -> None:
async with kubernetes_asyncio.client.api_client.ApiClient() as api:
cust_obj_api = kubernetes_asyncio.client.CustomObjectsApi(api_client=api)
await cust_obj_api.list_namespaced_custom_object(
label_selector=selector_string(target_resource.match_labels),
label_selector=dict_to_selector(
target_resource.spec.selector.match_labels
),
namespace=self.config.namespace,
**METRICS_CUSTOM_OJBECT_CONST_ARGS,
)
Expand All @@ -181,17 +186,13 @@ async def check_target_containers(self) -> None:
target_resource = await _get_target_resource(self.config)
if self.config.container:
assert (
next(
(
c
for c in target_resource.containers
if c.name == self.config.container
),
None,
)
find_container(workload=target_resource, name=self.config.container)
is not None
), f"Configured container {self.config.container} was not found in target app containers ({', '.join((c.name for c in target_resource.containers))})"
elif len(target_resource.containers) > 1:
), (
f"Configured container {self.config.container} was not found in target app containers"
f" ({', '.join((c.name for c in get_containers(workload=target_resource)))})"
)
elif len(get_containers(workload=target_resource)) > 1:
raise CheckError(
"Container name must be configured for target application with multiple containers"
)
Expand Down Expand Up @@ -261,7 +262,6 @@ async def measure(
target_metrics = [
m for m in self.config.metrics_to_collect if m.value in metrics
]
target_resource = await _get_target_resource(self.config)

progress_duration = servo.Duration(control.warmup + control.duration)
progress = servo.EventProgress(timeout=progress_duration)
Expand All @@ -286,7 +286,6 @@ async def measure(

try:
await self.periodic_measure(
target_resource=target_resource,
target_metrics=target_metrics,
datapoints_dicts=datapoints_dicts,
)
Expand Down Expand Up @@ -353,19 +352,20 @@ def _get_target_container_metrics(

async def periodic_measure(
self,
target_resource: Union[Deployment, Rollout],
target_metrics: list[SupportedKubeMetrics],
datapoints_dicts: Dict[str, Dict[str, List[DataPoint]]],
) -> None:
# Retrieve latest main state
await target_resource.refresh()
target_resource = await _get_target_resource(self.config)
target_resource_container = _get_target_resource_container(
self.config, target_resource
)

async with kubernetes_asyncio.client.api_client.ApiClient() as api:
cust_obj_api = kubernetes_asyncio.client.CustomObjectsApi(api_client=api)
label_selector_str = selector_string(target_resource.match_labels)
label_selector_str = dict_to_selector(
target_resource.spec.selector.match_labels
)
timestamp = datetime.now()

if any((m in MAIN_METRICS_REQUIRE_CUST_OBJ for m in target_metrics)):
Expand Down Expand Up @@ -404,8 +404,8 @@ async def periodic_measure(
value=mem_usage,
)

cpu_resources = target_resource_container.get_resource_requirements(
"cpu"
cpu_resources = ContainerHelper.get_resource_requirements(
target_resource_container, Resource.cpu.value
)
# Set requests = limits if not specified
if (
Expand Down Expand Up @@ -441,8 +441,8 @@ async def periodic_measure(
value=cpu_saturation,
)

mem_resources = target_resource_container.get_resource_requirements(
"memory"
mem_resources = ContainerHelper.get_resource_requirements(
target_resource_container, Resource.memory.value
)
# Set requests = limits if not specified
if (
Expand Down Expand Up @@ -486,40 +486,45 @@ async def periodic_measure(
datapoints_dicts=datapoints_dicts,
time=timestamp,
)
for pod in await target_resource.get_pods():
target_pods = [
pod
for pod in await PodHelper.list_pods_with_labels(
target_resource.metadata.namespace,
target_resource.spec.selector.match_labels,
)
if "tuning" not in pod.metadata.name
]
for pod in target_pods:
_append_data_point_for_time(
pod_name=pod.name,
pod_name=pod.metadata.name,
metric_name=SupportedKubeMetrics.MAIN_POD_RESTART_COUNT.value,
value=pod.restart_count,
value=PodHelper.get_restart_count(pod),
)

# Retrieve latest tuning state
target_resource_tuning_pod_name = f"{target_resource.name}-tuning"
target_resource_tuning_pod: Pod = next(
(
p
for p in await target_resource.get_pods()
if p.name == target_resource_tuning_pod_name
),
None,
)
target_resource_tuning_pod_name = f"{target_resource.metadata.name}-tuning"
try:
target_resource_tuning_pod = await PodHelper.read(
target_resource_tuning_pod_name, target_resource.metadata.namespace
)
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status != 404 or e.reason != "Not Found":
raise
target_resource_tuning_pod = None

if target_resource_tuning_pod:
target_resource_tuning_pod_container = _get_target_resource_container(
self.config, target_resource_tuning_pod
)
cpu_resources = (
target_resource_tuning_pod_container.get_resource_requirements(
"cpu"
)
cpu_resources = ContainerHelper.get_resource_requirements(
target_resource_tuning_pod_container, Resource.cpu.value
)
# Set requests = limits if not specified
if (cpu_request := cpu_resources[ResourceRequirement.request]) is None:
cpu_request = cpu_resources[ResourceRequirement.limit]

mem_resources = (
target_resource_tuning_pod_container.get_resource_requirements(
"memory"
)
mem_resources = ContainerHelper.get_resource_requirements(
target_resource_tuning_pod_container, Resource.memory.value
)
if (mem_request := mem_resources[ResourceRequirement.request]) is None:
mem_request = mem_resources[ResourceRequirement.limit]
Expand All @@ -537,7 +542,9 @@ async def periodic_measure(
restart_count = None
if SupportedKubeMetrics.TUNING_POD_RESTART_COUNT in target_metrics:
if target_resource_tuning_pod is not None:
restart_count = target_resource_tuning_pod.restart_count
restart_count = PodHelper.get_restart_count(
target_resource_tuning_pod
)
else:
restart_count = 0

Expand All @@ -550,7 +557,7 @@ async def periodic_measure(
# TODO: (potential improvement) raise error if more than 1 tuning pod?
for pod_entry in tuning_metrics["items"]:
pod_name = pod_entry["metadata"]["name"]
if pod_name != f"{target_resource.name}-tuning":
if pod_name != target_resource_tuning_pod_name:
raise RuntimeError(f"Got unexpected tuning pod name {pod_name}")
timestamp = isoparse(pod_entry["timestamp"])
_append_data_point_for_pod = functools.partial(
Expand Down Expand Up @@ -669,40 +676,39 @@ def _append_data_point(

async def _get_target_resource(
config: KubeMetricsConfiguration,
) -> Union[Deployment, Rollout]:
) -> Union[V1Deployment, V1StatefulSet]:
read_args = dict(name=config.name, namespace=config.namespace)
if config.kind.lower() == "deployment":
return await Deployment.read(**read_args)
elif config.kind.lower() == "rollout":
return await Rollout.read(**read_args)
return await DeploymentHelper.read(**read_args)
elif config.kind.lower() == "statefulset":
return await StatefulSetHelper.read(**read_args)
else:
raise NotImplementedError(
f"Resource type {config.kind} is not supported by the kube-metrics connector"
)


def _get_target_resource_container(
config: KubeMetricsConfiguration, target_resource: Union[Deployment, Rollout, Pod]
) -> Container:
config: KubeMetricsConfiguration,
target_resource: Union[V1Deployment, V1StatefulSet, V1Pod],
) -> V1Container:
if config.container:
if isinstance(target_resource, Pod):
target_resource_container: Container = target_resource.get_container(
config.container
)
else:
target_resource_container: Container = target_resource.find_container(
config.container
)

target_resource_container = find_container(
workload=target_resource, name=config.container
)
if target_resource_container is None:
raise RuntimeError(
f"Unable to locate container {config.container} in {target_resource.obj.kind} {target_resource.name}"
f"Unable to locate container {config.container} in {target_resource.kind} {target_resource.metadata.name}"
)
elif len(target_resource.containers) > 1:
# TODO (improvement) can support this with ID append
raise RuntimeError(f"Unable to derive metrics for multi-container resources")
else:
target_resource_container: Container = target_resource.containers[0]
containers = get_containers(workload=target_resource)
# TODO (improvement) can support this with ID append
if len(containers) > 1:
raise RuntimeError(
f"Unable to derive metrics for multi-container resources"
)

target_resource_container = containers[0]

return target_resource_container

Expand Down
Loading

0 comments on commit af53d05

Please sign in to comment.