Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show clearer grpc errors, as well as a custom failure for sensor timeouts in particular since those are so common #11576

Merged
merged 1 commit into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python_modules/dagster/dagster/_api/snapshot_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster._core.errors import DagsterUserCodeProcessError
from dagster._core.host_representation.external_data import ExternalSensorExecutionErrorData
from dagster._core.host_representation.handle import RepositoryHandle
from dagster._grpc.client import DEFAULT_GRPC_TIMEOUT
from dagster._grpc.types import SensorExecutionArgs
from dagster._serdes import deserialize_as

Expand All @@ -20,6 +21,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
) -> SensorExecutionData:
from dagster._grpc.client import ephemeral_grpc_api_client

Expand All @@ -35,6 +37,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc(
last_completion_time,
last_run_key,
cursor,
timeout=timeout,
)


Expand All @@ -46,6 +49,7 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
) -> SensorExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(sensor_name, "sensor_name")
Expand All @@ -64,7 +68,8 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time=last_completion_time,
last_run_key=last_run_key,
cursor=cursor,
)
),
timeout=timeout,
),
(SensorExecutionData, ExternalSensorExecutionErrorData),
)
Expand Down
32 changes: 30 additions & 2 deletions python_modules/dagster/dagster/_grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,34 @@ def _get_response(
stub = DagsterApiStub(channel)
return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)

def _raise_grpc_exception(self, e: Exception, timeout, custom_timeout_message=None):
if isinstance(e, grpc.RpcError):
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
raise DagsterUserCodeUnreachableError(
custom_timeout_message
or f"User code server request timed out due to taking longer than {timeout} seconds to complete."
) from e
else:
raise DagsterUserCodeUnreachableError(
f"Could not reach user code server. gRPC Error code: {e.code().name}"
) from e
else:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

def _query(
self,
method,
request_type,
timeout=DEFAULT_GRPC_TIMEOUT,
custom_timeout_message=None,
**kwargs,
):
try:
return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
except Exception as e:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
self._raise_grpc_exception(
e, timeout=timeout, custom_timeout_message=custom_timeout_message
)

def _get_streaming_response(
self,
Expand All @@ -155,14 +172,17 @@ def _streaming_query(
method,
request_type,
timeout=DEFAULT_GRPC_TIMEOUT,
custom_timeout_message=None,
**kwargs,
):
try:
yield from self._get_streaming_response(
method, request=request_type(**kwargs), timeout=timeout
)
except Exception as e:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
self._raise_grpc_exception(
e, timeout=timeout, custom_timeout_message=custom_timeout_message
)

def ping(self, echo):
check.str_param(echo, "echo")
Expand Down Expand Up @@ -365,6 +385,13 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_
SensorExecutionArgs,
)

custom_timeout_message = (
f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the"
" sensor function. One way to avoid this error is to break up the sensor work into"
" chunks, using cursors to let subsequent sensor calls pick up where the previous call"
" left off."
)

chunks = list(
self._streaming_query(
"ExternalSensorExecution",
Expand All @@ -373,6 +400,7 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_
serialized_external_sensor_execution_args=serialize_dagster_namedtuple(
sensor_execution_args
),
custom_timeout_message=custom_timeout_message,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc
from dagster._core.definitions.sensor_definition import SensorExecutionData
from dagster._core.errors import DagsterUserCodeProcessError
from dagster._core.errors import DagsterUserCodeProcessError, DagsterUserCodeUnreachableError

from .utils import get_bar_repo_handle

Expand Down Expand Up @@ -32,3 +32,17 @@ def test_external_sensor_raises_dagster_error(instance):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_raises_dagster_error", None, None, None
)


def test_external_sensor_timeout(instance):
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=(
"The sensor tick timed out due to taking longer than 0 seconds to execute the"
" sensor function."
),
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_foo", None, None, None, timeout=0
)
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ def test_load_grpc_server(capfd):
assert f"Started Dagster code server for file {python_file} on port {port} in process" in out


def test_grpc_connection_error():
port = find_free_port()
client = DagsterGrpcClient(port=port, host="localhost")
with pytest.raises(
DagsterUserCodeUnreachableError,
match="Could not reach user code server. gRPC Error code: UNAVAILABLE",
):
client.ping("foobar")


def test_python_environment_args():
port = find_free_port()
python_file = file_relative_path(__file__, "grpc_repo.py")
Expand Down