Skip to content

Commit

Permalink
vdk-ipython: infer correctly job name and add more tests (#2602)
Browse files Browse the repository at this point in the history
If job name is not explcitly provided we should infer it correctly from
the path name.

Also I extended ipython test coverage to cover most of the other
untested job input methods

---------

Co-authored-by: Dilyan Marinov <[email protected]>
  • Loading branch information
antoniivanov and DeltaMichael authored Aug 30, 2023
1 parent a45205a commit 43f4cf8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 9 deletions.
6 changes: 5 additions & 1 deletion projects/vdk-plugins/vdk-ipython/src/vdk/plugin/ipython.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ def __init__(
):
path = pathlib.Path(path) if path else pathlib.Path(os.getcwd())
job_args = json.loads(arguments) if arguments else None
self._name = name
self._name = path.name if not name else name
self._path = path
self._arguments = job_args
self._template = template
self.job = None
self.job_input = None
log.debug(
f"Job Control for job with name {self._name} from path {self._path} "
f"with arguments {self._arguments} and template {self._template}"
)

def get_initialized_job_input(self):
"""
Expand Down
77 changes: 69 additions & 8 deletions projects/vdk-plugins/vdk-ipython/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import logging
import time
from contextlib import contextmanager

import pytest
from _pytest._py.path import LocalPath
from _pytest.monkeypatch import MonkeyPatch
from IPython.core.error import UsageError
from IPython.testing.globalipapp import start_ipython
from vdk.api.job_input import IJobInput
Expand All @@ -27,6 +30,14 @@ def ip(session_ip):
session_ip.run_line_magic(magic_name="reset", line="-f")


@contextmanager
def get_vdk_ipython(session_ip, vdk_load_arguments_line=""):
session_ip.run_line_magic(magic_name="load_ext", line="vdk.plugin.ipython")
session_ip.run_line_magic(magic_name="reload_VDK", line=vdk_load_arguments_line)
yield session_ip
session_ip.run_line_magic(magic_name="reset", line="-f")


def test_load_vdk_with_no_arguments(ip):
assert ip.user_global_ns["VDK"] is not None
assert isinstance(ip.user_global_ns["VDK"], JobControl)
Expand All @@ -53,13 +64,13 @@ def test_get_initialized_job_input(ip):
assert isinstance(ip.user_global_ns["job_input"], IJobInput)


def test_calling_get_initialise_job_input_multiple_times(ip, tmpdir):
def test_calling_get_initialise_job_input_multiple_times(ip):
assert ip.user_global_ns["VDK"] is not None
assert isinstance(ip.user_global_ns["VDK"], JobControl)

# first call
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")
result_job_input = ip.get_ipython().getoutput("job_input")
result_job_input = ip.user_global_ns["job_input"]

# test first called object
assert ip.user_global_ns["job_input"] is not None
Expand All @@ -73,16 +84,17 @@ def test_calling_get_initialise_job_input_multiple_times(ip, tmpdir):
assert isinstance(ip.user_global_ns["job_input"], IJobInput)

# check whether first job_input is the same as the second one
assert result_job_input == ip.get_ipython().getoutput("job_input")
assert result_job_input == ip.user_global_ns["job_input"]


# uses the pytest tmpdir fixture - https://docs.pytest.org/en/6.2.x/tmpdir.html#the-tmpdir-fixture
async def test_extension_with_ingestion_job(ip, tmpdir):
def test_extension_with_ingestion_job(ip, tmpdir):
# set environmental variables via Jupyter notebook
job_dir = str(tmpdir) + "vdk-sqlite.db"
ip.get_ipython().run_cell("%env VDK_INGEST_METHOD_DEFAULT=sqlite")
ip.get_ipython().run_cell(f"%env VDK_SQLITE_FILE={job_dir}")
ip.get_ipython().run_cell("%env VDK_DB_DEFAULT_TYPE=SQLITE")
ip.get_ipython().run_cell("%env INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND=true")

# get the job_input
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")
Expand Down Expand Up @@ -111,11 +123,11 @@ async def test_extension_with_ingestion_job(ip, tmpdir):
)

# get the data that is going to be ingested
ip.get_ipython().run_cell("import requests")
ip.get_ipython().run_cell("import json")
ip.get_ipython().run_cell(
'response = requests.get("https://jsonplaceholder.typicode.com/todos/1")'
"""raw = '{ "userId": 1, "id": 1, "title": "delectus aut autem", "completed": false }' """
)
ip.get_ipython().run_cell("payload = response.json()")
ip.get_ipython().run_cell("payload = json.loads(raw)")

# send data for ingestion
ip.get_ipython().run_cell(
Expand Down Expand Up @@ -177,6 +189,55 @@ def test_extension_with_pure_sql_job(ip, tmpdir):
)


def test_extension_with_job_input_get_job_dir(
session_ip, tmpdir: LocalPath, monkeypatch: MonkeyPatch
):
monkeypatch.chdir(str(tmpdir))
with get_vdk_ipython(session_ip) as ip:
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")
assert ip.get_ipython().run_cell(
"str(job_input.get_job_directory())"
).result == str(tmpdir)


def test_extension_with_job_input_get_name(
session_ip, tmpdir: LocalPath, monkeypatch: MonkeyPatch
):
monkeypatch.chdir(str(tmpdir))
with get_vdk_ipython(session_ip) as ip:
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")

assert (
ip.get_ipython().run_cell("job_input.get_name()").result == tmpdir.basename
)
assert ip.user_global_ns["job_input"].get_name() == tmpdir.basename


def test_extension_with_job_input_execution_properties(
ip, tmpdir: LocalPath, monkeypatch: MonkeyPatch
):
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")

execution_properties = ip.user_global_ns["job_input"].get_execution_properties()
assert "pa__execution_id" in execution_properties
assert "pa__op_id" in execution_properties


def test_extension_with_job_input_get_arguments(session_ip):
with get_vdk_ipython(session_ip, '--arguments {"a":2}') as ip:
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")

assert ip.get_ipython().run_cell("job_input.get_arguments()").result == {"a": 2}


def test_extension_with_job_input_properties(ip):
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")
assert ip.get_ipython().run_cell("job_input.set_all_properties({'test':'my-test'})")
assert (
ip.get_ipython().run_cell("job_input.get_property('test')").result == "my-test"
)


def test_finalise(ip):
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")

Expand Down Expand Up @@ -236,7 +297,7 @@ def test_call_finalize_before_get_initialized_job_input(ip):
# verifying that calling finalize before get_initialized_job_input won't produce errors(the method will not fail)


def test_calling_get_initialise_job_input_multiple_times_after_finalize(ip, tmpdir):
def test_calling_get_initialise_job_input_multiple_times_after_finalize(ip):
# first call
ip.get_ipython().run_cell("job_input = VDK.get_initialized_job_input()")
result_job_input = ip.get_ipython().run_cell("job_input").result
Expand Down

0 comments on commit 43f4cf8

Please sign in to comment.