Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-plugins: Audit log statements #2878

Merged
merged 4 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def poke(self, context: Dict[Any, Any]) -> bool:
job_execution = vdk_hook.get_job_execution_status(self.job_execution_id)
job_status = job_execution.status

log.info(
log.debug(
f"Current status of job execution {self.job_execution_id} is: {job_status}."
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from vdk.plugin.audit.audit_config import AuditConfiguration


logger = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class AuditPlugin:
Expand All @@ -36,13 +36,13 @@ def _audit(event, args):
event == not_permitted_event
for not_permitted_event in forbidden_events_list
):
logger.warning(
log.warning(
f'[Audit] Detected FORBIDDEN operation "{event}" with '
f'arguments "{args}" '
)

if self._config.exit_on_forbidden_event():
logger.error(
log.error(
f"[Audit] Terminating the data job due to the FORBIDDEN "
f'operation "{event}" with arguments "{args}" '
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def validate(self, jobs: List[Dict]):
for job in jobs:
self._validate_job(job)
self._check_dag_cycles(jobs)
log.info("Successfully validated the DAG!")
log.debug("Successfully validated the DAG!")

def _raise_error(
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = ""
Expand Down Expand Up @@ -81,7 +81,7 @@ def _validate_job(self, job: Dict):
self._validate_fail_dag_on_error(job_name, job["fail_dag_on_error"])
if "arguments" in job:
self._validate_arguments(job_name, job["arguments"])
log.info(f"Successfully validated job: {job_name}")
log.debug(f"Successfully validated job: {job_name}")

def _validate_job_type(self, job: Dict):
if not isinstance(job, dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def pre_ingest_process(

@hookimpl
def initialize_job(self, context: JobContext) -> None:
log.info("Initialize data job with GDP Execution ID Plugin.")
log.debug("Initialize data job with GDP Execution ID Plugin.")
self._plugin_config = GdpExecutionIdPluginConfiguration(
context.core_context.configuration
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
ManagedConnectionBase,
)

_log = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class GreenplumConnection(ManagedConnectionBase):
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(
Using *async*=True an asynchronous connection will be created. *async_* is
a valid alias (for Python versions where ``async`` is a keyword).
"""
super().__init__(_log)
super().__init__(log)

self._dsn = dsn
self._connection_factory = connection_factory
Expand All @@ -87,7 +87,7 @@ def __init__(
dsn_message_optional = ""
if self._dsn:
dsn_message_optional = f"dsn: {dsn}, "
_log.debug(
log.debug(
f"Creating new Greenplum connection for {dsn_message_optional}"
f"user: {user} to [host:port/dbname]: {host}:{port}/{dbname}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from vdk.internal.core import errors
from vdk.plugin.greenplum.greenplum_connection import GreenplumConnection

_log = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class IngestToGreenplum(IIngesterPlugin):
Expand All @@ -34,7 +34,7 @@ def ingest_payload(
See parent class doc for details
"""

_log.info(
log.info(
f"Ingesting payloads to table: {destination_table} in Greenplum database; "
f"collection_id: {collection_id}"
)
Expand All @@ -50,7 +50,7 @@ def ingest_payload(
try:
cursor.execute(query, parameters)
connection.commit()
_log.debug("Payload was ingested.")
log.debug("Payload was ingested.")
except Exception as e:
errors.log_and_rethrow(errors.find_whom_to_blame_from_exception(e), e)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def initialize_job(self, context: JobContext) -> None:
token = context.core_context.configuration.get_value(HUGGINGFACE_TOKEN)

if token:
log.info("huggingface log in", extra={"huggingface_repo_id": repo_id})
log.debug("huggingface log in", extra={"huggingface_repo_id": repo_id})
huggingface_hub.login(token)

self._ingester = IngestToHuggingface(repo_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ManagedConnectionBase,
)

_log = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class ImpalaConnection(ManagedConnectionBase):
Expand All @@ -29,7 +29,7 @@ def __init__(
auth_cookie_names=None,
retries=3,
):
super().__init__(_log)
super().__init__(log)

self._host = host
self._port = port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def handle_error(
# try to handle multiple failed to open file errors in one query for different tables
current_exception = caught_exception
while recovery_cursor.get_retries() < self._num_retries:
self._log.info(
self._log.debug(
f"Try ({(recovery_cursor.get_retries() + 1)} of {self._num_retries}) "
f"to handle exception {current_exception}"
)
Expand Down Expand Up @@ -125,7 +125,6 @@ def _handle_should_not_retry_error(self, exception) -> bool:
self._log.info(
"Execution of query exceeded impala time limit. "
"This is most likely because the query is resource heavy and needs optimisation. "
"The query would not be retried, in order to avoid increasing the load on the database."
)
return True

Expand All @@ -142,7 +141,7 @@ def _handle_hdfs_failed_to_open_file_error(
regex = ".*/user/hive/warehouse/([^/]*).db/([^/]*)"
matcher = re.compile(pattern=regex, flags=re.DOTALL)
results = matcher.search(str(exception).strip())
self._log.info(
self._log.debug(
"Detected query failing with Failed to find file error. WIll try to autorecover."
)
if results and len(results.groups()) == 2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def db_connection_recover_operation(self, recovery_cursor: RecoveryCursor) -> No
if impala_error_handler.handle_error(
recovery_cursor.get_exception(), recovery_cursor
):
logging.getLogger(__name__).info(
logging.getLogger(__name__).debug(
"Error handled successfully! Query execution has succeeded."
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def finalize(self):
self.job_input = None
else:
log.warning(
"You are trying to finalize a job that is not existing!\n"
"Initialize a job using VDK.get_initialized_job_input() first, please! "
"You are trying to finalize a job that does not exist.\n"
"Initialize a job using VDK.get_initialized_job_input() first. "
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ def prepare_result_cell_output(result_df):
except ImportError:
log.info(
"ipyaggrid is not installed. "
"If installed result would be formatted in a interactive grid."
"If installed result would be formatted in an interactive grid."
)
return result_df
except Exception as e:
log.error(
"There was an issue rendering the query result output using ipyaggrid. "
f"Error was {str(e)} "
"Consider reinstall the package if needed."
"Fall back to default table visualization"
"Consider reinstalling the package if needed."
"Falling back to default table visualization"
)
return result_df

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def do_GET(self):
Otherwise, a 404 error is sent as the response.
"""
if self.path == "/threads":
log.info("Dumping threads")
self.send_response(200)
self.send_header("Content-type", "text/plain]")
self.end_headers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@

class OAuth2Handler(APIHandler):
def initialize(self, vdk_config: VdkJupyterConfig):
log.info(f"VDK config: {vdk_config.__dict__}")
log.debug(f"VDK config: {vdk_config.__dict__}")

self._authorization_url = vdk_config.oauth2_authorization_url
self._access_token_url = vdk_config.oauth2_token_url
self._client_id = vdk_config.oauth2_client_id
self._redirect_url = vdk_config.oauth2_redirect_url

log.info(f"Authorization URL: {self._authorization_url}")
log.info(f"Access Token URL: {self._access_token_url}")
# log.info(f"client_id: {self._client_id}")
log.debug(f"Authorization URL: {self._authorization_url}")
log.debug(f"Access Token URL: {self._access_token_url}")
# log.debug(f"client_id: {self._client_id}")

# No client secret. We use only native app workflow with PKCE (RFC 7636)

Expand Down Expand Up @@ -61,17 +61,17 @@ def get(self):
redirect_url = self.request.full_url()
redirect_url = self._fix_localhost(redirect_url)

log.info(f"redirect uri is {redirect_url}")
log.debug(f"redirect uri is {redirect_url}")

if self.get_argument("code", None):
log.info(
log.debug(
"Authorization code received. Will generate access token using authorization code."
)
tokens = self._exchange_auth_code_for_access_token(redirect_url)
log.info(f"Got tokens data: {tokens}") # TODO: remove this
log.debug(f"Got tokens data: {tokens}") # TODO: remove this
self._persist_tokens_data(tokens)
else:
log.info(f"Authorization URL is: {self._authorization_url}")
log.debug(f"Authorization URL is: {self._authorization_url}")
full_authorization_url = self._prepare_authorization_code_request_url(
redirect_url
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __attempt_kerberos_authentication(
f"The error was: {e} "
"This means potential future operations requiring "
"valid kerberos authentication will fail. "
"You may need to inspect the above error and try to fix it if that happens. "
"If no kerberos related operations are done, ignore this warning."
"If you prefer to fail fast (instead of this warning), enable krb_auth_fail_fast flag."
"See vdk config-help for more information."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ def __configure_current_os_process_to_use_own_kerberos_credentials_cache(self):
self._oldKRB5CCNAME = os.environ[
"KRB5CCNAME"
] # used for deleting the env variable later
log.info(f"KRB5CCNAME was already set to {self._oldKRB5CCNAME}")
log.debug(f"KRB5CCNAME was already set to {self._oldKRB5CCNAME}")
except KeyError:
tmpfile = tempfile.NamedTemporaryFile(prefix="vdkkrb5cc", delete=True).name
os.environ["KRB5CCNAME"] = "FILE:" + tmpfile
log.info(f"KRB5CCNAME is set to a new file {tmpfile}")
log.debug(f"KRB5CCNAME is set to a new file {tmpfile}")
self._tmp_file = tmpfile

def __restore_previous_os_process_kerberos_credentials_cache_configuration(self):
if hasattr(self, "_oldKRB5CCNAME"):
os.environ["KRB5CCNAME"] = self._oldKRB5CCNAME
log.info(f"KRB5CCNAME is restored to {self._oldKRB5CCNAME}")
log.debug(f"KRB5CCNAME is restored to {self._oldKRB5CCNAME}")
del self._oldKRB5CCNAME
else:
del os.environ["KRB5CCNAME"]
log.info("KRB5CCNAME is now unset")
log.debug("KRB5CCNAME is now unset")
try:
os.remove(self._tmp_file)
except OSError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def validate(self, jobs: List[Dict]):
for job in jobs:
self._validate_job(job)
self._check_dag_cycles(jobs)
log.info("Successfully validated the DAG!")
log.debug("Successfully validated the DAG!")

def _raise_error(
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = ""
Expand Down Expand Up @@ -82,7 +82,7 @@ def _validate_job(self, job: Dict):
)
if "arguments" in job:
self._validate_arguments(job_name, job["arguments"])
log.info(f"Successfully validated job: {job_name}")
log.debug(f"Successfully validated job: {job_name}")

def _validate_job_type(self, job: Dict):
if not isinstance(job, dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors

_log = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class IngestToPostgres(IIngesterPlugin):
Expand All @@ -33,7 +33,7 @@ def ingest_payload(
See parent class doc for details
"""

_log.info(
log.info(
f"Ingesting payloads to table: {destination_table} in database; "
f"collection_id: {collection_id}"
)
Expand All @@ -49,7 +49,7 @@ def ingest_payload(
try:
cursor.execute(query, parameters)
connection.commit()
_log.debug("Payload was ingested.")
log.debug("Payload was ingested.")
except Exception as e:
errors.report_and_rethrow(
errors.find_whom_to_blame_from_exception(e), e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def post_ingest_process(

@hookimpl(tryfirst=True)
def initialize_job(self, context: JobContext) -> None:
log.info("Initialize data job with ConvertPayloadValuesToString Plugin.")
log.debug("Initialize data job with ConvertPayloadValuesToString Plugin.")

context.ingester.add_ingester_factory_method("add-payload-size", lambda: self)

Expand All @@ -124,6 +124,6 @@ def ingest_payload(

@hookimpl(tryfirst=True)
def initialize_job(self, context: JobContext) -> None:
log.info("Initialize data job with DummyIngestion Plugin.")
log.debug("Initialize data job with DummyIngestion Plugin.")

context.ingester.add_ingester_factory_method("dummy-ingest", lambda: self)
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,6 @@ def ingest_payload(

@hookimpl
def initialize_job(self, context: JobContext) -> None:
log.info("Initialize data job with IngestIntoMemory Plugin.")
log.debug("Initialize data job with IngestIntoMemory Plugin.")

context.ingester.add_ingester_factory_method(self.method_name, lambda: self)