Skip to content

Commit

Permalink
vdk-core: Split execution summary into chunks (#867)
Browse files Browse the repository at this point in the history
At the end of a data job execution, we log execution summary, which describes the
python and sql steps that have been executed, their status, etc. Sometimes,
when a job has a large number of steps, the execution summary will be logged as a
single large string message.

This is not an issue in most cases. However, on some operating systems, there are
limits as to how large a logging message can be when printed to terminal, or send
through a socket, and in such situations jobs fail with `OSError: [Errno 40] Message too long`
errors triggered by the logging module.

This change checks if the message is too long, and if that is the case, splits it into
chunks. The chunks are then logged as separate messages to avoid OS-specific errors
related to logging.

Testing Done: Locally ran a data job with a large number of steps and verified that
the execution summary is split into multiple log messages.

Signed-off-by: Andon Andonov <[email protected]>
  • Loading branch information
doks5 authored Jun 21, 2022
1 parent eb991a5 commit 4c1a580
Showing 1 changed file with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import math
import os
import pathlib
from typing import cast
from typing import Dict
from typing import List
from typing import Optional

import click
Expand Down Expand Up @@ -45,6 +47,25 @@ def __validate_and_parse_args(arguments: str) -> Optional[Dict]:
wrap_in_vdk_error=True,
)

@staticmethod
def __split_into_chunks(exec_steps: List, chunks: int) -> List:
"""
Generator that splits the list of execution steps into sequential
sub-lists.
:param exec_steps: the list of execution steps
:param chunks: the number of sublists that need to be produced
:return: a list of the sub-lists produced
"""
quotient, remainder = divmod(len(exec_steps), chunks)
for i in range(chunks):
subsequent_iteration = (quotient + 1) * (
i if i < remainder else remainder
) + quotient * (0 if i < remainder else i - remainder)
yield exec_steps[
subsequent_iteration : subsequent_iteration
+ (quotient + 1 if i < remainder else quotient)
]

def create_and_run_data_job(
self,
context: CoreContext,
Expand All @@ -62,7 +83,25 @@ def create_and_run_data_job(
execution_result = None
try:
execution_result = job.run(args)
log.info(f"Data Job execution summary: {execution_result}")

# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

else:
log.info(f"Data Job execution summary: {execution_result}")
except BaseException as e:
errors.log_and_rethrow(
job_input_error_classifier.whom_to_blame(
Expand Down

0 comments on commit 4c1a580

Please sign in to comment.