Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AttributeError/Remove use of NoteableKernelManager #194

Merged
merged 5 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.28
current_version = 0.0.29
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
serialize =
{major}.{minor}.{patch}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.29] - 2023-07-20
### Removed
- Remove use of `NoteableKernelManager` to reduce unnecessary abstractions

### Fixed
- Fix `AttributeError` on trying to fetch `NoteableClient.file_session_cache` which has been removed. Determine kernels channel id from file_id instead.

## [0.0.28] - 2023-07-17
### Changed
- Allow any version of `noteable-origami` to be installed with this library
Expand Down
2 changes: 1 addition & 1 deletion papermill_origami/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.28"
version = "0.0.29"
68 changes: 22 additions & 46 deletions papermill_origami/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
)
from papermill.engines import Engine, NotebookExecutionManager

from .manager import NoteableKernelManager
from .util import flatten_dict, parse_noteable_file_id

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,7 +73,6 @@ def __init__(
self,
nb_man: NotebookExecutionManager,
client: Optional[NoteableClient] = None,
km: Optional[NoteableKernelManager] = None,
timeout_func=None,
timeout: float = None,
log_output: bool = False,
Expand All @@ -88,19 +86,15 @@ def __init__(
----------
nb_man : NotebookExecutionManager
Notebook execution manager wrapper being executed.
km : KernelManager (optional)
Optional kernel manager. If none is provided, a kernel manager will
be created.
"""
self.nb_man = nb_man
self.client = client
self.km = km
self.timeout_func = timeout_func
self.timeout = timeout
self.log_output = log_output
self.stdout_file = stdout_file
self.stderr_file = stderr_file
self.kernel_name = kw.get('kernel_name', '__NOT_SET__')
self.kernel_name = kw.get('kernel_name')
self.nb = nb_man.nb
# Map parent_collection_id to cell_id in order to process any append_output_events
# which are uniquely identified by parent_collection_id and not cell_id
Expand Down Expand Up @@ -136,7 +130,7 @@ def wrapper(cell, *args, **kwargs):
cell.metadata.papermill, parent_key_tuple=("papermill",)
).items():
try:
run_sync(self.km.client.update_cell_metadata)(
run_sync(self.client.update_cell_metadata)(
file=self.file,
cell_id=cell.id,
metadata_update_properties={"path": key, "value": value},
Expand Down Expand Up @@ -245,7 +239,6 @@ async def execute(self, **kwargs):
ext_logger.info(f"Parameterized notebook available at {parameterized_url}")

# Temporarily sleep for 1s to wait for file to be available to be subscribed to.
# TODO: remove this once Noteable API fix is deployed to prod.
await asyncio.sleep(1)

try:
Expand Down Expand Up @@ -287,9 +280,6 @@ async def execute(self, **kwargs):

# Override the notebook_complete method and set it to a no-op (since we already called it)
self.nb_man.notebook_complete = lambda: None

# info_msg = self.wait_for_reply(self.kc.kernel_info())
# self.nb.metadata['language_info'] = info_msg['content']['language_info']
except: # noqa
logger.exception("Error executing notebook")
if self.job_instance_attempt:
Expand Down Expand Up @@ -319,11 +309,11 @@ async def sync_noteable_nb_with_papermill(
deleted_cell_ids = list(set(noteable_nb_cell_ids) - set(papermill_nb_cell_ids))
added_cell_ids = list(set(papermill_nb_cell_ids) - set(noteable_nb_cell_ids))
for cell_id in deleted_cell_ids:
await self.km.client.delete_cell(file, cell_id)
await self.client.delete_cell(file, cell_id)
for cell_id in added_cell_ids:
idx = papermill_nb_cell_ids.index(cell_id)
after_id = papermill_nb_cell_ids[idx - 1] if idx > 0 else None
await self.km.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id)
await self.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id)

ext_logger.info(
"Synced notebook with Noteable, "
Expand All @@ -339,38 +329,26 @@ async def sync_noteable_nb_metadata_with_papermill(self):
self.nb.metadata.papermill, parent_key_tuple=("papermill",)
).items():
try:
await self.km.client.update_nb_metadata(self.file, {"path": key, "value": value})
await self.client.update_nb_metadata(self.file, {"path": key, "value": value})
except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError):
logger.debug("Encountered an error while updating notebook metadata")
pass

@staticmethod
def create_kernel_manager(file: NotebookFile, client: NoteableClient, **kwargs):
"""Helper that generates a kernel manager object from kwargs"""
return NoteableKernelManager(file, client, **kwargs)

@asynccontextmanager
async def setup_kernel(self, cleanup_kc=True, cleanup_kc_on_error=False, **kwargs) -> Generator:
"""Context manager for setting up the kernel to execute a notebook."""
ext_logger = kwargs["logger"]

if self.km is None:
# Assumes that file and client are being passed in
self.km = self.create_kernel_manager(**kwargs)
# Pass in the kernel name if specified
launch_kwargs = {}
if self.kernel_name is not None:
launch_kwargs["kernel_name"] = self.kernel_name

await self.client.get_or_launch_ready_kernel_session(self.file, **launch_kwargs)

await self.km.async_start_kernel(**kwargs)
ext_logger.info("Started kernel")

try:
yield
# if cleanup_kc:
# if await self.km.async_is_alive():
# await self.km.async_shutdown_kernel()
finally:
pass
# if cleanup_kc and cleanup_kc_on_error:
# if await self.km.async_is_alive():
# await self.km.async_shutdown_kernel()
yield

sync_execute = run_sync(execute)

Expand All @@ -381,7 +359,7 @@ def _cell_exception(self, cell, cell_index=None, **kwargs):
self.catch_cell_metadata_updates(self.nb_man.cell_exception)(cell, cell_index, **kwargs)
# Manually update the Noteable nb metadata
try:
run_sync(self.km.client.update_nb_metadata)(
run_sync(self.client.update_nb_metadata)(
self.file, {"path": ["papermill", "exception"], "value": True}
)
except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError):
Expand All @@ -408,34 +386,35 @@ async def papermill_execute_cells(self):
metadata of each cell.
"""

files_channel = self.km.client.files_channel(file_id=self.km.file.id)
self.km.client.register_message_callback(
files_channel = self.client.files_channel(file_id=self.file.id)
kernels_channel = self.client.kernels_channel(file_id=self.file.id)
self.client.register_message_callback(
self._update_outputs_callback,
files_channel,
"update_output_collection_event",
response_schema=UpdateOutputCollectionEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._append_outputs_callback,
files_channel,
"append_output_event",
response_schema=AppendOutputEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._display_handler_update_callback,
files_channel,
"update_outputs_by_display_id_event",
response_schema=DisplayHandlerUpdateEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._update_execution_count_callback,
self.km.kernel.kernel_channel,
kernels_channel,
"bulk_cell_state_update_event",
response_schema=BulkCellStateMessage,
once=False,
Expand Down Expand Up @@ -651,7 +630,7 @@ async def async_execute_cell(
cell : NotebookNode
The cell which was just processed.
"""
assert self.km.client is not None
assert self.client is not None
if cell.cell_type != 'code':
logger.debug("Skipping non-executing cell %s", cell_index)
return cell
Expand All @@ -672,11 +651,8 @@ async def async_execute_cell(

# By default this will wait until the cell execution status is no longer active

result = await self.km.client.execute(self.km.file, cell.id)
# TODO: This wasn't behaving correctly with the timeout?!
# result = await asyncio.wait_for(self.km.client.execute(self.km.file, cell.id), self._get_timeout(cell))
result = await self.client.execute(self.file, cell.id)
if result.state.is_error_state:
# TODO: Add error info from stacktrace output messages
raise CellExecutionError("", str(result.state), "Cell execution failed")
return cell

Expand Down
125 changes: 0 additions & 125 deletions papermill_origami/manager.py

This file was deleted.

Loading