Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add user experience related metrics #617

Merged
merged 1 commit into from
Nov 1, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/ClusterManager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,90 @@
float("inf")),
labelnames=("fn_name",))

job_state_change_histogram = Histogram("job_state_change_latency_seconds",
"latency for job to change state(seconds)",
buckets=(1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0,
float("inf")),
labelnames=("current_state",))

class JobTimeRecord(object):
def __init__(self):
self.create_time = None
self.approve_time = None
self.submit_time = None


class LRUDefatulDict(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to leverage third party caching package e.g. https://github.com/tkem/cachetools/tree/c530924cdec86855be6322d3e4dd979bfc9250e4

class Node(object):
def __init__(self, key, val, next=None, prev=None):
self.key = key
self.val = val
self.next = next
self.prev = prev

def __init__(self, cap, factory):
assert cap > 0
self.m = {}
self.head = self.tail = None
self.cap = cap
self.factory = factory

def __getitem__(self, key):
return self._get(key)

def _get(self, key):
if self.m.get(key) is None:
self._put(key, self.factory())

return self.m[key].val

def _put(self, key, value):
if self.m.get(key) is not None:
node = self.m[key]
if node == self.head:
node.val = value
else:
node.prev.next = node.next
if node.next is not None:
node.next.prev = node.prev
node.prev = None
node.next = self.head
self.head.prev = node
self.head = node
else:
self.head = LRUDefatulDict.Node(key, value, self.head, None)
if self.tail is None:
self.tail = self.head
else:
self.head.next.prev = self.head
self.m[key] = self.head
if len(self.m) > self.cap:
self.m.pop(self.tail.key)
self.tail = self.tail.prev
self.tail.next = None

# pure memory data structure
job_time_recorder = LRUDefatulDict(500, lambda : JobTimeRecord())

# If previous state has no record, which means the job_manager get restarted
# or previous entry is expired, we ignore this entry.
def update_job_state_latency(job_id, state, event_time=None):
if event_time is None:
event_time = datetime.datetime.utcnow()

if state == "create":
job_time_recorder[job_id].create_time = event_time
elif state == "approve":
job_time_recorder[job_id].approve_time = event_time
if job_time_recorder[job_id].create_time is not None:
elapsed = (event_time - job_time_recorder[job_id].create_time).seconds
job_state_change_histogram.labels(state).observe(elapsed)
elif state == "submit":
job_time_recorder[job_id].submit_time = event_time
if job_time_recorder[job_id].approve_time is not None:
elapsed = (event_time - job_time_recorder[job_id].approve_time).seconds
job_state_change_histogram.labels(state).observe(elapsed)

def record(fn):
@functools.wraps(fn)
def wrapped(*args, **kwargs):
Expand Down Expand Up @@ -98,6 +182,9 @@ def ApproveJob(job, dataHandlerOri=None):
try:
job_id = job["jobId"]
vcName = job["vcName"]

update_job_state_latency(job_id, "create", event_time=job["jobTime"])

jobParams = json.loads(base64.b64decode(job["jobParams"]))
job_total_gpus = GetJobTotalGpu(jobParams)

Expand All @@ -111,6 +198,7 @@ def ApproveJob(job, dataHandlerOri=None):
detail = [{"message": "waiting for available preemptible resource."}]
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
update_job_state_latency(job_id, "approve")
if dataHandlerOri is None:
dataHandler.Close()
return True
Expand Down Expand Up @@ -151,6 +239,7 @@ def ApproveJob(job, dataHandlerOri=None):
detail = [{"message": "waiting for available resource."}]
dataHandler.UpdateJobTextField(job["jobId"], "jobStatusDetail", base64.b64encode(json.dumps(detail)))
dataHandler.UpdateJobTextField(job_id, "jobStatus", "queued")
update_job_state_latency(job_id, "approve")
if dataHandlerOri is None:
dataHandler.Close()
return True
Expand Down Expand Up @@ -443,6 +532,7 @@ def TakeJobActions(launcher, jobs):
try:
if sji["job"]["jobStatus"] == "queued" and (sji["allowed"] is True):
launcher.submit_job(sji["job"])
update_job_state_latency(sji["jobId"], "submit")
logging.info("TakeJobActions : submitting job : %s : %s" % (sji["jobId"], sji["sortKey"]))
elif sji["preemptionAllowed"] and (sji["job"]["jobStatus"] == "scheduling" or sji["job"]["jobStatus"] == "running") and (sji["allowed"] is False):
launcher.kill_job(sji["job"]["jobId"], "queued")
Expand Down