Skip to content

Commit

Permalink
vdk-ipython: add %%vdkingest magic function
Browse files Browse the repository at this point in the history
This commit introduces a new IPython magic function, `vdkingest`, which
enables data ingestion through TOML configuration within a Jupyter
notebook. This enhancement aims to simplify and streamline the data
ingestion process by allowing users to declare their data flows in a
structured format (TOML), thereby making it more accessible and
maintainable.

This is an optional feature of the plugin. If `vdk-data-sources` is not
installed, an ImportError is raised with an informative message.
  • Loading branch information
antoniivanov committed Nov 14, 2023
1 parent 745b14d commit 1ba3ad8
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 5 deletions.
39 changes: 39 additions & 0 deletions projects/vdk-plugins/vdk-ipython/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,45 @@ select * from placeholder_todo
where completed = True
```

### Ingesting data with %%vdkingest

```toml
%%vdkingest

# Data Source Configuration
[sources.yourSourceId]
## Data Source Name. Installed dta sources can be seen using vdk data-sources --list
name = "<data-source-name>"
## The singer tap we will use
config = {
## Set the configuration for the data source.
## You can see what config options are supported with vdk data-sources --config <data-source-name>
}

[sources.yourSourceId_2]
# repeat this for as many sources you want
# ...

# Data Destination Configuration.
## Ingestion methods and targets are the same one as those accepted by send_object_for_ingestion
## See https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/api/job_input.py#L183
[destinations.yourDestinationId]
## the only required filed is method
method = "<method-name>"
## Optionally specify target
## target =

[destinations.yourDestinationId_2]
# repeat this for as many destinations you want
# ...

# Data Flows from Source to Destination
[[flows]]
from = "yourSourceId"
to = "yourDestinationId"
```

Complete the full self-paced tutorial at https://bit.ly/vdk-ingest

### Build and testing

Expand Down
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-ipython/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ ipyaggrid
# testing dependecies
pytest
vdk-core
vdk-data-sources
vdk-sqlite
vdk-test-utils
3 changes: 2 additions & 1 deletion projects/vdk-plugins/vdk-ipython/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import setuptools


__version__ = "0.1.0"
__version__ = "0.2.5"

setuptools.setup(
name="vdk-ipython",
Expand All @@ -28,4 +28,5 @@
"Programming Language :: Python :: 3.11",
"Framework :: IPython",
],
extras_require={"data-sources": ["vdk-data-sources"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.plugin.ipython.ingest import vdkingest
from vdk.plugin.ipython.job import load_job
from vdk.plugin.ipython.job import magic_load_job
from vdk.plugin.ipython.sql import vdksql
Expand All @@ -20,3 +21,6 @@ def load_ipython_extension(ipython):
"""
ipython.register_magic_function(magic_load_job, magic_name="reload_VDK")
ipython.register_magic_function(vdksql, magic_kind="cell", magic_name="vdksql")
ipython.register_magic_function(
vdkingest, magic_kind="cell", magic_name="vdkingest"
)
50 changes: 50 additions & 0 deletions projects/vdk-plugins/vdk-ipython/src/vdk/plugin/ipython/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import warnings

from IPython import get_ipython
from vdk.api.job_input import IJobInput
from vdk.plugin.ipython import job
from vdk.plugin.ipython.common import show_ipython_error
from vdk.plugin.ipython.job import JobControl

log = logging.getLogger(__name__)


def vdkingest(line, cell):
"""
TOML based ingestion configuration
"""

vdk: JobControl = get_ipython().user_global_ns.get("VDK", None)
if not vdk:
log.warning(
"VDK is not initialized with '%reload_VDK'. "
"Will auto-initialize now wth default parameters."
)
job.load_job()
vdk = get_ipython().user_global_ns.get("VDK", None)
if not vdk:
message = "VDK cannot initialized. Please execute: %reload_VDK"
show_ipython_error(message)
return None

job_input: IJobInput = vdk.get_initialized_job_input()

try:
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
except ImportError:
raise ImportError(
"vdk-data-sources is not installed. %%vdkingest is not available without it"
)

from vdk.plugin.data_sources.mapping import toml_parser
import toml

parsed_toml = toml.loads(cell)
definitions = toml_parser.definitions_from_dict(parsed_toml)

with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)
5 changes: 3 additions & 2 deletions projects/vdk-plugins/vdk-ipython/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ def ip(session_ip):


@pytest.fixture(scope="function")
def sqlite_ip(ip, tmpdir):
job_dir = str(tmpdir) + "vdk-sqlite.db"
def sqlite_ip(ip, tmp_path):
job_dir = f"{tmp_path}/vdk-sqlite.db"
ip.get_ipython().run_cell("%env VDK_INGEST_METHOD_DEFAULT=sqlite")
ip.get_ipython().run_cell(f"%env VDK_SQLITE_FILE={job_dir}")
ip.get_ipython().run_cell(f"%env VDK_INGEST_TARGET_DEFAULT={job_dir}")
ip.get_ipython().run_cell("%env VDK_DB_DEFAULT_TYPE=SQLITE")
ip.get_ipython().run_cell("%env INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND=true")
yield ip
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-ipython/tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
from unittest.mock import patch

import ipyaggrid

_log = logging.getLogger(__name__)


@patch.dict(
os.environ,
{"USE_DEFAULT_CELL_TABLE_OUTPUT": "true"},
)
def test_ingest_cell(sqlite_ip, capsys):
query = """%%vdkingest
[sources]
s1 = {name = "auto-generated-data"}
[destinations]
d1 = {method = "sqlite"}
[[flows]]
from="s1"
to="d1"
"""
sqlite_ip.get_ipython().run_cell(query)
assert capsys.readouterr().out == ""

select_query = """
%%vdksql
SELECT * from stream_0
"""
capsys.readouterr() # reset buffer
select_output = sqlite_ip.get_ipython().run_cell(select_query).result
assert select_output.values.tolist() == [
[1, "Stream_0_Name_0", 0],
[2, "Stream_0_Name_1", 0],
]
5 changes: 3 additions & 2 deletions projects/vdk-plugins/vdk-ipython/tests/test_job_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ def test_calling_get_initialise_job_input_multiple_times(ip):


# uses the pytest tmpdir fixture - https://docs.pytest.org/en/6.2.x/tmpdir.html#the-tmpdir-fixture
def test_extension_with_ingestion_job(ip, tmpdir):
def test_extension_with_ingestion_job(ip, tmp_path):
# set environmental variables via Jupyter notebook
job_dir = str(tmpdir) + "vdk-sqlite.db"
job_dir = f"{tmp_path}/vdk-sqlite.db"
ip.get_ipython().run_cell("%env VDK_INGEST_METHOD_DEFAULT=sqlite")
ip.get_ipython().run_cell(f"%env VDK_SQLITE_FILE={job_dir}")
ip.get_ipython().run_cell(f"%env VDK_INGEST_TARGET_DEFAULT={job_dir}")
ip.get_ipython().run_cell("%env VDK_DB_DEFAULT_TYPE=SQLITE")
ip.get_ipython().run_cell("%env INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND=true")

Expand Down

0 comments on commit 1ba3ad8

Please sign in to comment.