Skip to content

Commit

Permalink
pylint & format
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Jan 3, 2024
1 parent 6dfcc4a commit c31d927
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.workflow_output_handler import print_status

# pylint: disable=ungrouped-imports
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
Expand Down Expand Up @@ -141,7 +142,8 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
type=service_type,
serviceName=ingestion_pipeline.service.name,
sourceConfig=ingestion_pipeline.sourceConfig,
serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
# retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
serviceConnection=None,
)

if entity_class is None:
Expand Down Expand Up @@ -302,8 +304,8 @@ def send_failed_status_callback(workflow_config: OpenMetadataWorkflowConfig, _):
"Workflow config does not have ingestionPipelineFQN informed. We won't update the status."
)


class CustomPythonOperator(PythonOperator):

def on_kill(self) -> None:
"""
Override this method to clean up subprocesses when a task instance
Expand All @@ -315,6 +317,7 @@ def on_kill(self) -> None:
if workflow_config:
send_failed_status_callback(workflow_config, None)


def build_dag(
task_name: str,
ingestion_pipeline: IngestionPipeline,
Expand All @@ -337,7 +340,9 @@ def build_dag(
# There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retries=ingestion_pipeline.airflowConfig.retries or 0,
# each DAG will call its own OpenMetadataWorkflowConfig
on_failure_callback=partial(send_failed_status_callback, workflow_config, None),
on_failure_callback=partial(
send_failed_status_callback, workflow_config, None
),
# Add tag and ownership to easily identify DAGs generated by OM
owner=ingestion_pipeline.owner.name
if ingestion_pipeline.owner
Expand Down

0 comments on commit c31d927

Please sign in to comment.