Skip to content

Commit

Permalink
adding start of node status
Browse files Browse the repository at this point in the history
Signed-off-by: Paige Patton <[email protected]>
  • Loading branch information
paigerube14 authored and chaitanyaenr committed Jan 23, 2025
1 parent 1a782da commit a07496b
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/krkn_lib/elastic/krkn_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def push_telemetry(self, telemetry: ChaosRunTelemetry, index: str):
time_start = time.time()
elastic_chaos.save(using=self.es, index=index)
return int(time.time() - time_start)
except Exception:
except Exception as e:
self.safe_logger.info("Elastic push telemetry error: " + str(e))
return -1

def search_telemetry(self, run_uuid: str, index: str):
Expand Down
12 changes: 9 additions & 3 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import tempfile
import threading
import time
from urllib.parse import urlparse

import warnings
from concurrent.futures import ThreadPoolExecutor, wait
from functools import partial
from queue import Queue
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import arcaflow_lib_kubernetes
import kubernetes
Expand All @@ -29,6 +28,7 @@

from krkn_lib.models.k8s import (
PVC,
AffectedNode,
AffectedPod,
ApiRequestException,
Container,
Expand Down Expand Up @@ -1729,7 +1729,9 @@ def find_kraken_node(self) -> str:
raise e
return node_name

def watch_node_status(self, node: str, status: str, timeout: int):
def watch_node_status(
self, node: str, status: str, timeout: int, affected_node: AffectedNode
):
"""
Watch for a specific node status
Expand All @@ -1739,6 +1741,7 @@ def watch_node_status(self, node: str, status: str, timeout: int):
:param resource_version: version of the resource
"""
count = timeout
timer_start = time.time()
for event in self.watch_resource.stream(
self.cli.list_node,
field_selector=f"metadata.name={node}",
Expand All @@ -1761,6 +1764,9 @@ def watch_node_status(self, node: str, status: str, timeout: int):
)
if not count:
self.watch_resource.stop()
end_time = time.time()
affected_node.set_affected_node_status(status, end_time - timer_start)
return affected_node

#
# TODO: Implement this with a watcher instead of polling
Expand Down
23 changes: 21 additions & 2 deletions src/krkn_lib/models/elastic/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def __init__(


# Telemetry models


class ElasticAffectedPod(InnerDoc):
pod_name = Text(fields={"keyword": Keyword()})
namespace = Text()
Expand All @@ -87,6 +85,15 @@ class ElasticPodsStatus(InnerDoc):
error = Text()


class ElasticAffectedNodes(InnerDoc):
node_name = Text(fields={"keyword": Keyword()})
not_ready_time = Float()
ready_time = Float()
stopped_time = Float()
running_time = Float()
terminating_time = Float()


class ElasticScenarioParameters(InnerDoc):
pass

Expand All @@ -99,6 +106,7 @@ class ElasticScenarioTelemetry(InnerDoc):
parameters_base64 = Text()
parameters = Nested(ElasticScenarioParameters)
affected_pods = Nested(ElasticPodsStatus)
affected_nodes = Nested(ElasticAffectedNodes, multi=True)


class ElasticNodeInfo(InnerDoc):
Expand Down Expand Up @@ -167,6 +175,17 @@ def __init__(
],
error=sc.affected_pods.error,
),
affected_nodes=[
ElasticAffectedNodes(
node_name=node.node_name,
not_ready_time=node.not_ready_time,
ready_time=node.ready_time,
stopped_time=node.stopped_time,
running_time=node.running_time,
terminating_time=node.terminating_time,
)
for node in sc.affected_nodes
],
)
for sc in chaos_run_telemetry.scenarios
]
Expand Down
131 changes: 130 additions & 1 deletion src/krkn_lib/models/k8s/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ def __init__(self, json_object: str = None):
self.recovered = []
self.unrecovered = []
self.error = None

if json_object:
for recovered in json_object["recovered"]:
self.recovered.append(
Expand Down Expand Up @@ -213,6 +212,7 @@ def merge(self, pods_status: "PodsStatus"):
self.unrecovered.append(unrecovered)



class PodsMonitorThread:
executor: ThreadPoolExecutor
future: Future
Expand All @@ -235,6 +235,135 @@ def join(self, timeout: int = 120) -> PodsStatus:
return pods_status


class AffectedNode:
"""
A node affected by a chaos scenario
"""

node_name: str
"""
Name of the node
"""
ready_time: float
"""
Amount of time the node took to get to a ready state
"""
not_ready_time: float
"""
Amount of time the node took to get to a not ready state
"""
stopped_time: float
"""
Amount of time the cloud provider took to stop a node
"""
running_time: float
"""
Amount of time the cloud provider took to get a node running
"""
terminating_time: float

def __init__(
self,
node_name: str = "",
not_ready_time: float = 0,
ready_time: float = 0,
stopped_time: float = 0,
running_time: float = 0,
terminating_time: float = 0,
json_object: str = None,
):
self.node_name = node_name
self.not_ready_time = float(not_ready_time)
self.ready_time = float(ready_time)
self.stopped_time = float(stopped_time)
self.running_time = float(running_time)
self.terminating_time = float(terminating_time)

if json_object:
print("json object" + str(json_object))
self.node_name = json_object["node_name"]
self.set_not_ready_time(json_object["not_ready_time"])
self.set_ready_time(json_object["ready_time"])
self.set_cloud_stopping_time(json_object["stopped_time"])
self.set_cloud_running_time(json_object["running_time"])
self.set_terminating_time(json_object["terminating_time"])

def set_affected_node_status(self, status: str, total_time: float):
print("affected node status" + str(status))
if status == "Unknown":
self.set_not_ready_time(total_time)
elif status == "True":
self.set_ready_time(total_time)
elif status == "False":
self.set_not_ready_time(total_time)
elif status.lower() == "running":
self.set_cloud_running_time(total_time)
elif status.lower() == "stopped":
self.set_cloud_stopping_time(total_time)
elif status.lower() == "terminated":
self.set_terminating_time(total_time)

def set_not_ready_time(self, not_ready_time):
self.not_ready_time += float(not_ready_time)

def set_ready_time(self, ready_time):
self.ready_time += float(ready_time)

def set_cloud_stopping_time(self, stopped_time):
self.stopped_time += float(stopped_time)

def set_cloud_running_time(self, running_time):
self.running_time += float(running_time)

def set_terminating_time(self, terminating_time):
self.terminating_time += float(terminating_time)


class AffectedNodeStatus:
"""
Return value of wait_for_pods_to_become_ready_by_label and
wait_for_pods_to_become_ready_by_name_pattern containing the list
of the pods that did recover (pod_name, namespace,
time needed to become ready) and the list of pods that did
not recover from the chaos
"""

affected_nodes: list[AffectedNode]

def __init__(self):
self.affected_nodes = []

def merge_affected_nodes(self):
counter = 0
match_found = []
for affected_node in self.affected_nodes:
counter2 = counter + 1
for aff_node2 in self.affected_nodes[counter + 1:]: # fmt: skip
if affected_node.node_name == aff_node2.node_name:
match_found.append(counter2)
cur_node = self.affected_nodes[counter]
cur_node.set_not_ready_time(aff_node2.not_ready_time)
cur_node.set_ready_time(aff_node2.ready_time)
cur_node.set_cloud_stopping_time(aff_node2.stopped_time)
cur_node.set_cloud_running_time(aff_node2.running_time)
cur_node.set_terminating_time(aff_node2.terminating_time)
self.affected_nodes[counter] = cur_node
break
counter2 += 1
counter += 1

for item in reversed(match_found):
self.affected_nodes.pop(item)

def get_affected_node_index(self, node_name):
counter = 0

for affected_node in self.affected_nodes:
if affected_node.node_name == node_name:
return self.affected_nodes[counter]
counter += 1


class ServiceHijacking:
pod_name: str
namespace: str
Expand Down
26 changes: 20 additions & 6 deletions src/krkn_lib/models/telemetry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import yaml

from krkn_lib.models.k8s import PodsStatus
from krkn_lib.models.k8s import AffectedNode, PodsStatus

relevant_event_reasons: frozenset[str] = frozenset(
[
Expand Down Expand Up @@ -77,6 +77,10 @@ class ScenarioTelemetry:
"""
Pods affected by the chaos scenario
"""
affected_nodes: list[AffectedNode]
"""
Nodes affected by the chaos scenario
"""
cluster_events: list[ClusterEvent]
"""
Cluster events collected during the chaos run
Expand Down Expand Up @@ -104,6 +108,17 @@ def __init__(self, json_object: any = None):
self.affected_pods = PodsStatus(
json_object=json_object.get("affected_pods")
)

if json_object.get("affected_nodes") and isinstance(
json_object.get("affected_nodes"), list
):
self.affected_nodes = [
AffectedNode(json_object=node)
for node in json_object.get("affected_nodes")
]
else:
self.affected_nodes = []

if json_object.get("cluster_events") and isinstance(
json_object.get("cluster_events"), list
):
Expand Down Expand Up @@ -143,6 +158,7 @@ def __init__(self, json_object: any = None):
self.parameters_base64 = ""
self.parameters = {}
self.affected_pods = PodsStatus()
self.affected_nodes = []
self.cluster_events = []

def to_json(self) -> str:
Expand Down Expand Up @@ -318,7 +334,7 @@ def __init__(
# This parses CoreV1Event
# (https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Event.md)
self.name = k8s_obj.metadata.name
self.creation = k8s_obj.metadata.creation_timestamp
self.creation = str(k8s_obj.metadata.creation_timestamp)
self.reason = k8s_obj.reason
self.message = k8s_obj.message
self.namespace = k8s_obj.metadata.namespace
Expand All @@ -330,7 +346,7 @@ def __init__(

if k8s_json_dict:
self.name = k8s_json_dict["metadata"]["name"]
self.creation = k8s_json_dict["metadata"]["creationTimestamp"]
self.creation = str(k8s_json_dict["metadata"]["creationTimestamp"])
self.reason = k8s_json_dict["reason"]
self.message = k8s_json_dict["message"]
self.namespace = k8s_json_dict["metadata"]["namespace"]
Expand All @@ -345,7 +361,7 @@ def __init__(
if json_dict:
self.name = json_dict["name"]
self.namespace = json_dict["namespace"]
self.creation = json_dict["creation"]
self.creation = str(json_dict["creation"])
self.reason = json_dict["reason"]
self.message = json_dict["message"]
self.source_component = json_dict["source_component"]
Expand Down Expand Up @@ -416,8 +432,6 @@ class ChaosRunTelemetry:
Current time stamp of run
"""

affected_pods: PodsStatus = PodsStatus()

def __init__(self, json_dict: any = None):
self.scenarios = list[ScenarioTelemetry]()
self.node_summary_infos = list[NodeInfo]()
Expand Down
10 changes: 10 additions & 0 deletions src/krkn_lib/tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,16 @@ def get_ChaosRunTelemetry_json(self, run_uuid: str) -> dict:
],
"error": "some error",
},
"affected_nodes": [
{
"node_name": "kind-control-plane",
"ready_time": 2.71,
"not_ready_time": 3.14,
"stopped_time": 0,
"running_time": 0,
"terminating_time": 0,
}
],
}
],
"node_summary_infos": [
Expand Down
Loading

0 comments on commit a07496b

Please sign in to comment.