Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: New feature: StandaloneDataJob #793

Merged

Conversation

mrdavidlaing
Copy link
Contributor

@mrdavidlaing mrdavidlaing commented Apr 7, 2022

The PR contains a WIP implementation of a vdk-core extension that addresses #791

StandaloneDataJob - a context manager that:

  • can be instantiated from code rather than via the VDK CLI
  • can be run without needing any data job files
  • gives access to an instantiated job_input object before it is finalized

Example usage:

 with StandaloneDataJobFactory.create(datajob_directory) as job_input:
      #... use job_input object to interact with SuperCollider

@antoniivanov
Copy link
Collaborator

antoniivanov commented Apr 8, 2022

Thanks. I think it looks in right direction.

But first what would integration with dagster look like using this?
If you can post a snippet, pseudo-code something so we can imagine it better.

I am asking because
a) I want us to make sure this would work as an integration point
b) I would like to check if similar integration with other tools (like prefect or flyte) is possible.

I also made some small comments on the code but we probably should clear the interfaces and expected usage first.

@mrdavidlaing
Copy link
Contributor Author

mrdavidlaing commented Apr 8, 2022

Thanks. I think it looks in right direction.

Thanks for the quick feedback!

But first what would integration with dagster look like using this? If you can post a snippet, pseudo-code something so we can imagine it better.

The way we've approached it is to integrate into Dagster's Resource system; which basically stores an instance of the VDK job_input object on an object that is passed to every op() executed by Dagster

It looks roughly like this:

@resource(...snip...)
def supercollider_datawarehouse_resource(init_context):
    datajob_directory = Path(abspath(file_relative_path(__file__, '../')))

    datajob = NoOpStepDataJob(datajob_directory)
    try:
        datajob.initialize_job()
        vdk_job_input = datajob.run_and_return_job_input()

        yield SuperColliderDwhResource(
            vdk_job_input=vdk_job_input,
            ...snip...
        )
    finally:
        datajob.finalize_job()

The SuperColliderDwhResource() is a fairly thin wrapper around vdk_job_input.get_managed_connection(), vdk_job_input.load() and vdk_job_input.send_tabular_data_for_ingestion() internally.

An example op() using the resource looks like this:

@op(
    required_resource_keys={"datawarehouse", "daily_partition_config"}
)
def enrich_dim_org_data_tmc(context, df_fact_entitlements: fact_entitlement.DataFrame, df_dim_deployments: dim_deployment.DataFrame) -> dim_org.DataFrame:

    dim_org_ids = df_fact_entitlements.dim_org_id.append(df_dim_deployments.dim_org_id).unique().tolist()
    org_ids = extract_csp_org_ids_from_dim_org_ids(dim_org_ids)
    df_raw_data = context.resources.datawarehouse.read_sql_query(text("""
        WITH all_tmc_org_entries_for_day AS (
            SELECT
                CONCAT('CSP:', oci.organization_id)      AS dim_org_id 
     ...snip...

We have a similar set of datawarehouse_resources backed by SQLite that we use during local development & testing.

For full details see the internal implementation & discussion at https://gitlab.eng.vmware.com/tanzu-portfolio-insights/tanzu-dm/-/merge_requests/24, specifically:

I also made some small comments on the code but we probably should clear the interfaces and expected usage first.

Agreed. I'll loop back and address the points mentioned above once we've got the high level structure sorted out

@antoniivanov
Copy link
Collaborator

antoniivanov commented Apr 12, 2022

The way we've approached it is to integrate into Dagster's Resource system; which basically stores an instance of the VDK job_input object on an object that is passed to every op() executed by Dagster

Sorry for taking my time. I was a bit busy and I wanted to research a bit more Dagster.
I like the approach. It's really simple and it's very much inline with how other somewhat similar tools have integrated with dagster. I took a look at dagster-airbyte , dagster-pyspark and daster-mlflow

It will also make it easier to integrate with other frameworks (like Jupyter notebook, e.g to be able to execute data jobs though a notebook). Something I am planning to integrate soon so I'd like to re-use this.

I would love if we can contribute to dagster to have dagster-vdk integration library. I don't know if it's something you can consider.

@antoniivanov
Copy link
Collaborator

We have a similar set of datawarehouse_resources backed by SQLite that we use during local development & testing.

Oh btw, there's now vdk-sqlite plugin - if you install it and set in db_default_type = sqlite - you'd get sqlite based data job.
Note sure if it might help here but I wanted to pointed out.

@mrdavidlaing
Copy link
Contributor Author

I would love if we can contribute to dagster to have dagster-vdk integration library. I don't know if it's something you can consider.

That seems like a good long term goal - although I'm not sure when I'd have time to contribute towards that.

Since having a way to instantiate a DataJob from code (the subject of this PR) is a necessary building block to enable a dagster-vdk library; could we keep this PR focussed on that narrower objective?

@antoniivanov
Copy link
Collaborator

I would love if we can contribute to dagster to have dagster-vdk integration library. I don't know if it's something you can consider.

That seems like a good long term goal - although I'm not sure when I'd have time to contribute towards that.

Since having a way to instantiate a DataJob from code (the subject of this PR) is a necessary building block to enable a dagster-vdk library; could we keep this PR focussed on that narrower objective?

Most certainly yes. I didn't mean to imply otherwise. It's definitely a separate effort.

@antoniivanov
Copy link
Collaborator

@mrdavidlaing

To avoid ambiguity. To close the PR I think we need the following things

  1. Finalize naming discussion
  2. Take care of comments on the code - the code duplication particularly is something for me that I'd like we fix.
  3. Create the public interface in vdk.api.
    One simple way to do that is to create vdk/api/data_job.py something like
class SomeNameWeAgreeOnDataJob:

    def __init__(self):
        from vdk.internal.builtin_plugins.run import DataJobImpl
        self.__job = DataJobImpl()

    def __enter__(self):
        self.__job.__enter__(self)

    def __exit__(self):
        self.__job.__exit()

    def get_job_input(self):
        return self.__job.get_job_input()
  1. Tests. I suggest some kind of "functional" test similar to any of those https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-core/tests/functional

We have not yet set up our CICD to trigger from a fork. So as long as the tests pass locally (from vdk-core folder ./cicd/build.sh succeeds) I think that's enough and I'll merge the change.

@mrdavidlaing
Copy link
Contributor Author

Still working on this - just got sidetracked a bit.

Will hopefully push up some new code early next week

@mrdavidlaing
Copy link
Contributor Author

mrdavidlaing commented Apr 27, 2022

TODO

  • Finalize naming discussion - StandaloneDataJob
  • Create the public interface in vdk.api.
  • Take care of comments on the code - the code duplication particularly is something for me that I'd like we fix.
  • Tests.

Copy link
Collaborator

@antoniivanov antoniivanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks good to me. My suggestion is to postpone the refactoring (de-duplication) for separate PR. We just need some automated tests and we can merge it.

@mrdavidlaing
Copy link
Contributor Author

That looks good to me. My suggestion is to postpone the refactoring (de-duplication) for separate PR. We just need some automated tests and we can merge it.

Sounds good.

I've been fighting a bit trying to get the build working on an Mac M1 (which doesn't officially support Python 3.7). Should hopefully have something to commit later in the week

@antoniivanov
Copy link
Collaborator

I've been fighting a bit trying to get the build working on an Mac M1 (which doesn't officially support Python 3.7). Should hopefully have something to commit later in the week

You don't need to use Python 3.7 . This VDK works with all recent python versions.

@mrdavidlaing
Copy link
Contributor Author

Is there a way you can disable/skip the use of python 3.7 in the commit process?

❯ git commit
[INFO] Initializing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Initializing environment for https://github.com/jorisroovers/gitlint.
[INFO] Initializing environment for https://github.com/jorisroovers/gitlint:./gitlint-core[trusted-deps].
[INFO] Initializing environment for https://github.com/psf/black.
[INFO] Initializing environment for https://github.com/pycqa/pydocstyle.
[INFO] Initializing environment for https://github.com/pre-commit/mirrors-pylint.
[INFO] Initializing environment for https://github.com/asottile/reorder_python_imports.
[INFO] Initializing environment for https://github.com/asottile/pyupgrade.
[INFO] Initializing environment for https://github.com/Lucas-C/pre-commit-hooks.
[INFO] Installing environment for https://github.com/pre-commit/pre-commit-hooks.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
[INFO] Installing environment for https://github.com/psf/black.
[INFO] Once installed this environment will be reused.
[INFO] This may take a few minutes...
An unexpected error has occurred: CalledProcessError: command: ('/Users/mrdavidlaing/.pyenv/versions/3.8.13/bin/python3.8', '-mvirtualenv', '/Users/mrdavidlaing/.cache/pre-commit/repoej7q_26p/py_env-python3.7', '-p', 'python3.7')
return code: 1
expected return code: 0
stdout:
    RuntimeError: failed to find interpreter for Builtin discover of python_spec='python3.7'
    
stderr: (none)
Check the log at /Users/mrdavidlaing/.cache/pre-commit/pre-commit.log
❯ cat /Users/mrdavidlaing/.cache/pre-commit/pre-commit.log
### version information

pre-commit version: 2.18.1
git --version: git version 2.32.0 (Apple Git-132)
sys.version:
    3.8.13 (default, May  4 2022, 18:37:14) 
    [Clang 13.1.6 (clang-1316.0.21.2.3)]
sys.executable: /Users/mrdavidlaing/.pyenv/versions/3.8.13/bin/python3.8
os.name: posix
sys.platform: darwin

### error information

An unexpected error has occurred: CalledProcessError: command: ('/Users/mrdavidlaing/.pyenv/versions/3.8.13/bin/python3.8', '-mvirtualenv', '/Users/mrdavidlaing/.cache/pre-commit/repoej7q_26p/py_env-python3.7', '-p', 'python3.7')
return code: 1
expected return code: 0
stdout:
    RuntimeError: failed to find interpreter for Builtin discover of python_spec='python3.7'
    
stderr: (none)


Traceback (most recent call last):
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/error_handler.py", line 73, in error_handler
    yield
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/main.py", line 343, in main
    return hook_impl(
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/commands/hook_impl.py", line 237, in hook_impl
    return retv | run(config, store, ns)
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/commands/run.py", line 414, in run
    install_hook_envs(to_install, store)
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/repository.py", line 221, in install_hook_envs
    _hook_install(hook)
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/repository.py", line 79, in _hook_install
    lang.install_environment(
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/languages/python.py", line 202, in install_environment
    cmd_output_b(*venv_cmd, cwd='/')
  File "/Users/mrdavidlaing/.pyenv/versions/3.8.13/lib/python3.8/site-packages/pre_commit/util.py", line 146, in cmd_output_b
    raise CalledProcessError(returncode, cmd, retcode, stdout_b, stderr_b)
pre_commit.util.CalledProcessError: command: ('/Users/mrdavidlaing/.pyenv/versions/3.8.13/bin/python3.8', '-mvirtualenv', '/Users/mrdavidlaing/.cache/pre-commit/repoej7q_26p/py_env-python3.7', '-p', 'python3.7')
return code: 1
expected return code: 0
stdout:
    RuntimeError: failed to find interpreter for Builtin discover of python_spec='python3.7'
    
stderr: (none)

@antoniivanov
Copy link
Collaborator

Is there a way you can disable/skip the use of python 3.7 in the commit process?

Interesting, that seems some configuration issue with our pre-commit hooks. I opened PR to fix it: #826

@mrdavidlaing
Copy link
Contributor Author

@tozka With 54f4a09 done I'm happy that we have a basic set of functional tests. Are there any other tests you'd like to see added?

Copy link
Collaborator

@antoniivanov antoniivanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me. Have you run ./cicd/build.sh locally from vdk-core folder ?

We have not configured yet to run CI from forks, unfortunately. So I'd need to merge your change manually. I will run the test myself but it would be good to get confirmation first.

@mrdavidlaing
Copy link
Contributor Author

@tozka Let me address the remaining Code Analysis warnings and squash the commits to make the merge nice and clean.

I'll ping you here when that is done

- Instantiate and execute plugin lifecycle from code rather than via the VDK CLI
- Gives access to an instantiated job_input object
- Can be run without needing any data job files
- Implemented as a contextmanager to reduce API surface area
- Triggers all plugin hooks except:
        * CoreHookSpecs.vdk_command_line

Sample usage:

    with StandaloneDataJobFactory.create(datajob_directory) as job_input:
        #... use job_input object to interact with SuperCollider
@mrdavidlaing mrdavidlaing force-pushed the 785-instantiate_job_input_from_code branch from 9b34c08 to cd9e4cd Compare May 11, 2022 13:35
@mrdavidlaing mrdavidlaing changed the title WIP: vdk-core: New feature: NoOpDataJob vdk-core: New feature: StandaloneDataJob May 11, 2022
@mrdavidlaing
Copy link
Contributor Author

mrdavidlaing commented May 11, 2022

@tozka I've addressed the Code Analysis warnings and squashed the commits into a single commit to ease merging.

I've also run projects/vdk-core/cicd/build.sh on my local machine which seemed to succeed barring the failure below that I don't think is related to the changes in this PR

============================================================================== short test summary info ===============================================================================
FAILED tests/functional/run/test_run_sql_queries.py::test_run_dbapi_connection_no_such_db_type - assert 'VdkConfigurationError' in '2022-05-11 14:34:25,169 [VDK] simple-create-ins...

Results (42.93s):
     253 passed
       1 failed
         - tests/functional/run/test_run_sql_queries.py:72 test_run_dbapi_connection_no_such_db_type

Are you happy to progress to merging this PR?

@antoniivanov
Copy link
Collaborator

@tozka I've addressed the Code Analysis warnings and squashed the commits into a single commit to ease merging.

I've also run projects/vdk-core/cicd/build.sh on my local machine which seemed to succeed barring the failure below that I don't think is related to the changes in this PR

============================================================================== short test summary info ===============================================================================
FAILED tests/functional/run/test_run_sql_queries.py::test_run_dbapi_connection_no_such_db_type - assert 'VdkConfigurationError' in '2022-05-11 14:34:25,169 [VDK] simple-create-ins...

Results (42.93s):
     253 passed
       1 failed
         - tests/functional/run/test_run_sql_queries.py:72 test_run_dbapi_connection_no_such_db_type

Are you happy to progress to merging this PR?

Yes..

I ran all the tests (with your PR) locally a few times and passed.

@antoniivanov antoniivanov merged commit 21a9163 into vmware:main May 11, 2022
@antoniivanov
Copy link
Collaborator

I've merged your change. Congrats and thanks for your first contribution (I hope one of many ;) )

The CI has kicked in - https://gitlab.com/vmware-analytics/versatile-data-kit/-/pipelines/536648262 hopefully it will pass and your change would be automatically released.

@mrdavidlaing
Copy link
Contributor Author

@tozka Am I correct in deducing that this feature landed in https://pypi.org/project/vdk-core/0.2.536648262/ ?

@antoniivanov
Copy link
Collaborator

@tozka Am I correct in deducing that this feature landed in https://pypi.org/project/vdk-core/0.2.536648262/ ?

Yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants