Skip to content

Commit

Permalink
vdk-notebook: Support for "%%vdkingest" cell type in Notebook Steps (#…
Browse files Browse the repository at this point in the history
…2867)

This commit introduces the ability to recognize and execute cells with
the `%%vdkingest` magic function in notebook-based data jobs.
Previously, notebook steps only supported Python and SQL cell types
(`%%vdksql`).

This addition enriches the functionality of notebook-based steps by
providing users the capability to define ingestion tasks directly within
a Jupyter notebook, enhancing flexibility and usability.
  • Loading branch information
antoniivanov authored Nov 15, 2023
1 parent 538275c commit 5009ee8
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 6 deletions.
3 changes: 2 additions & 1 deletion projects/vdk-plugins/vdk-notebook/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ vdk run example-job
```

- Each cell which has tag "vdk" is executed
- There could be two type of VDK Cells:
- There could be those types of VDK Cells:
- VDK Python Cell. Those are normal Python cells tagged with "vdk"
- VDK SQL Cell. Those are cells marked with `%%vdksql` magic and tagged with "vdk"
- VDK Ingest Cell. Those are celles marked with `%%vdkingest` magic.

For more information and examples see [vdk-ipython](../vdk-ipython/README.md) documentation

Expand Down
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-notebook/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ click
pytest
requests
vdk-core
vdk-data-sources
vdk-sqlite
vdk-test-utils
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-notebook/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
"Programming Language :: Python :: 3.10",
],
python_requires="~=3.7",
extras_require={"data-sources": ["vdk-data-sources"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from vdk.internal.builtin_plugins.run.file_based_step import TYPE_PYTHON
from vdk.internal.builtin_plugins.run.file_based_step import TYPE_SQL
from vdk.plugin.notebook.vdk_ingest import TYPE_INGEST


@dataclass
Expand All @@ -25,4 +26,7 @@ def __extract_source_code(jupyter_cell):
if lines and lines[0].strip().startswith("%%vdksql"):
statement = "".join(lines[1:])
return statement, TYPE_SQL
if lines and lines[0].strip().startswith("%%vdkingest"):
source = "".join(lines[1:])
return source, TYPE_INGEST
return "".join(lines), TYPE_PYTHON
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pathlib import Path
from typing import List

from vdk.internal.builtin_plugins.run.file_based_step import TYPE_PYTHON
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from vdk.internal.builtin_plugins.run.step import Step
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.notebook import vdk_ingest
from vdk.plugin.notebook.vdk_ingest import TYPE_INGEST

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,6 +76,8 @@ def get_run_function(source_type: Union[TYPE_PYTHON, TYPE_SQL]) -> Callable:
return NotebookStepFuncFactory.run_python_step
elif source_type == TYPE_SQL:
return NotebookStepFuncFactory.run_sql_step
elif source_type == TYPE_INGEST:
return vdk_ingest.run_ingest_step
else:
raise NotImplementedError(
f"Run function for source type {source_type} is not implemented."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import json
import logging

from vdk.api.plugin.hook_markers import hookimpl
from vdk.api.plugin.plugin_registry import HookCallResult
from vdk.api.plugin.plugin_registry import IPluginRegistry
from vdk.internal.builtin_plugins.run.execution_results import ExecutionResult
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.builtin_plugins.run.run_status import ExecutionStatus
from vdk.plugin.notebook.notebook import JobNotebookLocator
from vdk.plugin.notebook.notebook import Notebook

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import IJobInput


TYPE_INGEST = "ingest"


def run_ingest_step(step: "NotebookCellStep", job_input: IJobInput) -> bool:
"""
Run ingest data flow step. Only if vdk-data-sources is installed.
"""
try:
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
from vdk.plugin.data_sources.mapping import toml_parser
except ImportError:
raise ImportError(
"vdk-data-sources is not installed. ingestion step is not available without it"
)

import toml

parsed_toml = toml.loads(step.source)
definitions = toml_parser.definitions_from_dict(parsed_toml)

with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "654db028-2d0e-4f95-a377-e1fac1255f08",
"metadata": {
"tags": [
"vdk"
],
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%vdkingest\n",
"[sources]\n",
"s1 = {name = \"auto-generated-data\"}\n",
"\n",
"[destinations]\n",
"d1 = {method = \"memory\"}\n",
"\n",
"[[flows]]\n",
"from=\"s1\"\n",
"to=\"d1\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "532975cd-2e85-465f-b510-bdfd264192cf",
"metadata": {
"tags": [
"vdk"
]
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
24 changes: 24 additions & 0 deletions projects/vdk-plugins/vdk-notebook/tests/test_vdkingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from click.testing import Result
from vdk.plugin.data_sources import plugin_entry as data_sources_plugin_entry
from vdk.plugin.notebook import notebook_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory
from vdk.plugin.test_utils.util_plugins import IngestIntoMemoryPlugin


def test_ingest_vdkingest():
ingest_plugin = IngestIntoMemoryPlugin()
runner = CliEntryBasedTestRunner(
ingest_plugin, data_sources_plugin_entry, notebook_plugin
)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("ingest-data-flow-job")]
)

cli_assert_equal(0, result)

assert len(ingest_plugin.payloads) > 0

0 comments on commit 5009ee8

Please sign in to comment.