From 6bc417bb6ff1ba00c86d8c539721e3b12e380368 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Sun, 8 Jan 2023 22:10:23 -0600 Subject: [PATCH] Show clearer grpc errors, as well as a custom failure for sensor timeouts in particular since those are so common Summary: - surface the error code instead of burying it in the nested cause error - for timeouts, include the original timeout (and that it timed out) - For sensors, include how you might fix it --- .../dagster/dagster/_api/snapshot_sensor.py | 7 +++- .../dagster/dagster/_grpc/client.py | 32 +++++++++++++++++-- .../api_tests/test_api_snapshot_sensor.py | 16 +++++++++- .../grpc_tests/test_persistent.py | 18 +++++++++++ 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_api/snapshot_sensor.py b/python_modules/dagster/dagster/_api/snapshot_sensor.py index e9502a7dc1509..f47761e9c70ab 100644 --- a/python_modules/dagster/dagster/_api/snapshot_sensor.py +++ b/python_modules/dagster/dagster/_api/snapshot_sensor.py @@ -5,6 +5,7 @@ from dagster._core.errors import DagsterUserCodeProcessError from dagster._core.host_representation.external_data import ExternalSensorExecutionErrorData from dagster._core.host_representation.handle import RepositoryHandle +from dagster._grpc.client import DEFAULT_GRPC_TIMEOUT from dagster._grpc.types import SensorExecutionArgs from dagster._serdes import deserialize_as @@ -20,6 +21,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc( last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], + timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT, ) -> SensorExecutionData: from dagster._grpc.client import ephemeral_grpc_api_client @@ -35,6 +37,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc( last_completion_time, last_run_key, cursor, + timeout=timeout, ) @@ -46,6 +49,7 @@ def sync_get_external_sensor_execution_data_grpc( last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], + timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT, ) -> SensorExecutionData: check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(sensor_name, "sensor_name") @@ -64,7 +68,8 @@ def sync_get_external_sensor_execution_data_grpc( last_completion_time=last_completion_time, last_run_key=last_run_key, cursor=cursor, - ) + ), + timeout=timeout, ), (SensorExecutionData, ExternalSensorExecutionErrorData), ) diff --git a/python_modules/dagster/dagster/_grpc/client.py b/python_modules/dagster/dagster/_grpc/client.py index 4363b708854ed..38d019e5d58ff 100644 --- a/python_modules/dagster/dagster/_grpc/client.py +++ b/python_modules/dagster/dagster/_grpc/client.py @@ -128,17 +128,34 @@ def _get_response( stub = DagsterApiStub(channel) return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout) + def _raise_grpc_exception(self, e: Exception, timeout, custom_timeout_message=None): + if isinstance(e, grpc.RpcError): + if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + raise DagsterUserCodeUnreachableError( + custom_timeout_message + or f"User code server request timed out due to taking longer than {timeout} seconds to complete." + ) from e + else: + raise DagsterUserCodeUnreachableError( + f"Could not reach user code server. gRPC Error code: {e.code().name}" + ) from e + else: + raise DagsterUserCodeUnreachableError("Could not reach user code server") from e + def _query( self, method, request_type, timeout=DEFAULT_GRPC_TIMEOUT, + custom_timeout_message=None, **kwargs, ): try: return self._get_response(method, request=request_type(**kwargs), timeout=timeout) except Exception as e: - raise DagsterUserCodeUnreachableError("Could not reach user code server") from e + self._raise_grpc_exception( + e, timeout=timeout, custom_timeout_message=custom_timeout_message + ) def _get_streaming_response( self, @@ -155,6 +172,7 @@ def _streaming_query( method, request_type, timeout=DEFAULT_GRPC_TIMEOUT, + custom_timeout_message=None, **kwargs, ): try: @@ -162,7 +180,9 @@ def _streaming_query( method, request=request_type(**kwargs), timeout=timeout ) except Exception as e: - raise DagsterUserCodeUnreachableError("Could not reach user code server") from e + self._raise_grpc_exception( + e, timeout=timeout, custom_timeout_message=custom_timeout_message + ) def ping(self, echo): check.str_param(echo, "echo") @@ -365,6 +385,13 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_ SensorExecutionArgs, ) + custom_timeout_message = ( + f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the" + " sensor function. One way to avoid this error is to break up the sensor work into" + " chunks, using cursors to let subsequent sensor calls pick up where the previous call" + " left off." + ) + chunks = list( self._streaming_query( "ExternalSensorExecution", @@ -373,6 +400,7 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_ serialized_external_sensor_execution_args=serialize_dagster_namedtuple( sensor_execution_args ), + custom_timeout_message=custom_timeout_message, ) ) diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py index c9cd1749ed13d..42f69a54dd610 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -1,7 +1,7 @@ import pytest from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc from dagster._core.definitions.sensor_definition import SensorExecutionData -from dagster._core.errors import DagsterUserCodeProcessError +from dagster._core.errors import DagsterUserCodeProcessError, DagsterUserCodeUnreachableError from .utils import get_bar_repo_handle @@ -32,3 +32,17 @@ def test_external_sensor_raises_dagster_error(instance): sync_get_external_sensor_execution_data_ephemeral_grpc( instance, repository_handle, "sensor_raises_dagster_error", None, None, None ) + + +def test_external_sensor_timeout(instance): + with get_bar_repo_handle(instance) as repository_handle: + with pytest.raises( + DagsterUserCodeUnreachableError, + match=( + "The sensor tick timed out due to taking longer than 0 seconds to execute the" + " sensor function." + ), + ): + sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_foo", None, None, None, timeout=0 + ) diff --git a/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py b/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py index 53aeb19d7a8c4..fcc27ddccd60f 100644 --- a/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py +++ b/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py @@ -24,6 +24,7 @@ poll_for_finished_run, ) from dagster._core.types.loadable_target_origin import LoadableTargetOrigin +from dagster._grpc.__generated__ import api_pb2 from dagster._grpc.client import DagsterGrpcClient from dagster._grpc.server import ( ExecuteExternalPipelineArgs, @@ -65,6 +66,13 @@ def test_load_grpc_server(capfd): wait_for_grpc_server(process, client, subprocess_args) assert client.ping("foobar") == "foobar" + # check timeout + with pytest.raises( + DagsterUserCodeUnreachableError, + match="User code server request timed out due to taking longer than 0 seconds to complete.", + ): + client._query("Ping", api_pb2.PingRequest, echo="Time out", timeout=0) + list_repositories_response = sync_list_repositories_grpc(client) assert list_repositories_response.entry_point == ["dagster"] assert list_repositories_response.executable_path == sys.executable @@ -85,6 +93,16 @@ def test_load_grpc_server(capfd): assert f"Started Dagster code server for file {python_file} on port {port} in process" in out +def test_grpc_connection_error(): + port = find_free_port() + client = DagsterGrpcClient(port=port, host="localhost") + with pytest.raises( + DagsterUserCodeUnreachableError, + match="Could not reach user code server. gRPC Error code: UNAVAILABLE", + ): + client.ping("foobar") + + def test_python_environment_args(): port = find_free_port() python_file = file_relative_path(__file__, "grpc_repo.py")