Skip to content

Commit

Permalink
Fix job tick history queries when the repo is defined as a dict
Browse files Browse the repository at this point in the history
Summary:
We were not handling the jobState graphql queries correctly when schedules were loaded using the lazy repository dictionary definition.

This diff hides `jobs` (but does not throw) as a repository dictionary key and adds sensors.

It also merges schedules and sensors into jobs, making sure that the `jobState` query resolves correctly.

I changed some of the graphql test structure to load a lazy repository dictionary definition as one of the multi_location test variants.  Then I hooked up the jobs tests to query the secondary location repository and made sure it loaded correctly.

Should resolve #3626

Test Plan: bk

Reviewers: dgibson, johann

Reviewed By: dgibson

Differential Revision: https://dagster.phacility.com/D6359
  • Loading branch information
prha committed Feb 9, 2021
1 parent 1338ca2 commit 33ac358
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 213 deletions.
6 changes: 5 additions & 1 deletion js_modules/dagit/src/jobs/TickHistory.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export const JobTickHistory = ({
</Tabs>
);

if (!data || data?.jobStateOrError.__typename !== 'JobState') {
if (!data) {
return (
<Group direction="column" spacing={12}>
<Subheading>Tick History</Subheading>
Expand All @@ -157,6 +157,10 @@ export const JobTickHistory = ({
);
}

if (data.jobStateOrError.__typename === 'PythonError') {
return <PythonErrorInfo error={data.jobStateOrError} />;
}

const {ticks, nextTick, jobType} = data.jobStateOrError;
const displayedTicks = ticks.filter((tick) =>
tick.status === JobTickStatus.SKIPPED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dagster.core.host_representation import ExternalSchedule, ExternalSensor, JobSelector
from dagster.core.scheduler.job import JobStatus

from .utils import UserFacingGraphQLError, capture_error
from .utils import capture_error


@capture_error
Expand Down Expand Up @@ -34,15 +34,16 @@ def get_unloadable_job_states_or_error(graphene_info, job_type=None):

@capture_error
def get_job_state_or_error(graphene_info, selector):
from ..schema.errors import GrapheneJobNotFoundError
from ..schema.jobs import GrapheneJobState

check.inst_param(selector, "selector", JobSelector)
location = graphene_info.context.get_repository_location(selector.location_name)
repository = location.get_repository(selector.repository_name)

external_job = repository.get_external_job(selector.job_name)

if not external_job or not isinstance(external_job, (ExternalSensor, ExternalSchedule)):
raise UserFacingGraphQLError(GrapheneJobNotFoundError(selector.job_name))
check.failed(f"Could not find a definition for {selector.job_name}")

job_state = graphene_info.context.instance.get_job_state(external_job.get_external_origin_id())
if not job_state:
Expand Down
11 changes: 0 additions & 11 deletions python_modules/dagster-graphql/dagster_graphql/schema/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,6 @@ def __init__(self, sensor_name):
self.message = f"Could not find `{sensor_name}` in the currently loaded repository."


class GrapheneJobNotFoundError(graphene.ObjectType):
class Meta:
name = "JobNotFoundError"
interfaces = (GrapheneError,)

def __init__(self, job_name):
super().__init__()
self.name = check.str_param(job_name, "job_name")
self.message = f"Job {job_name} is not present in the currently loaded repository."


class GraphenePartitionSetNotFoundError(graphene.ObjectType):
class Meta:
interfaces = (GrapheneError,)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin
from dagster.grpc.server import GrpcServerProcess
from dagster.utils import merge_dicts
from dagster.utils.test import FilesystemTestScheduler
from dagster.utils.test.postgres_instance import TestPostgresInstance


Expand Down Expand Up @@ -61,6 +62,11 @@ def graphql_postgres_instance(overrides):
"class": "PostgresScheduleStorage",
"config": {"postgres_url": pg_conn_string},
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
"config": {"base_dir": temp_dir},
},
},
overrides if overrides else {},
),
Expand Down Expand Up @@ -100,6 +106,7 @@ def _in_memory_instance():
run_launcher=SyncInMemoryRunLauncher(),
run_coordinator=DefaultRunCoordinator(),
schedule_storage=SqliteScheduleStorage.from_local(temp_dir),
scheduler=FilesystemTestScheduler(temp_dir),
)

return MarkedManager(_in_memory_instance, [Marks.in_memory_instance])
Expand All @@ -118,6 +125,7 @@ def _readonly_in_memory_instance():
run_launcher=ExplodingRunLauncher(),
run_coordinator=DefaultRunCoordinator(),
schedule_storage=SqliteScheduleStorage.from_local(temp_dir),
scheduler=FilesystemTestScheduler(temp_dir),
)

return MarkedManager(
Expand Down Expand Up @@ -200,6 +208,11 @@ def _sqlite_instance():
with instance_for_test_tempdir(
temp_dir,
overrides={
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
"config": {"base_dir": temp_dir},
},
"run_coordinator": {
"module": "dagster.core.run_coordinator.queued_run_coordinator",
"class": "QueuedRunCoordinator",
Expand Down Expand Up @@ -289,6 +302,7 @@ def _sqlite_asset_instance():
compute_log_manager=LocalComputeLogManager(temp_dir),
run_coordinator=DefaultRunCoordinator(),
run_launcher=SyncInMemoryRunLauncher(),
scheduler=FilesystemTestScheduler(temp_dir),
)
yield instance

Expand Down Expand Up @@ -384,6 +398,29 @@ def _mgr_fn(recon_repo):

return MarkedManager(_mgr_fn, [Marks.multi_location])

@staticmethod
def lazy_repository():
@contextmanager
def _mgr_fn(recon_repo):
"""Goes out of process but same process as host process"""
check.inst_param(recon_repo, "recon_repo", ReconstructableRepository)

with Workspace(
[
ManagedGrpcPythonEnvRepositoryLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
python_file=file_relative_path(__file__, "setup.py"),
attribute="test_dict_repo",
),
location_name="test",
),
]
) as workspace:
yield workspace

return MarkedManager(_mgr_fn, [Marks.lazy_repository])


class Marks:
# Instance type makes
Expand All @@ -402,6 +439,7 @@ class Marks:
multi_location = pytest.mark.multi_location
managed_grpc_env = pytest.mark.managed_grpc_env
deployed_grpc_env = pytest.mark.deployed_grpc_env
lazy_repository = pytest.mark.lazy_repository

# Asset-aware sqlite variants
asset_aware_instance = pytest.mark.asset_aware_instance
Expand Down Expand Up @@ -587,6 +625,14 @@ def readonly_sqlite_instance_multi_location():
test_id="readonly_sqlite_instance_multi_location",
)

@staticmethod
def readonly_sqlite_instance_lazy_repository():
return GraphQLContextVariant(
InstanceManagers.readonly_sqlite_instance(),
EnvironmentManagers.lazy_repository(),
test_id="readonly_sqlite_instance_lazy_repository",
)

@staticmethod
def readonly_sqlite_instance_managed_grpc_env():
return GraphQLContextVariant(
Expand Down Expand Up @@ -619,6 +665,14 @@ def readonly_postgres_instance_multi_location():
test_id="readonly_postgres_instance_multi_location",
)

@staticmethod
def readonly_postgres_instance_lazy_repository():
return GraphQLContextVariant(
InstanceManagers.readonly_postgres_instance(),
EnvironmentManagers.lazy_repository(),
test_id="readonly_postgres_instance_lazy_repository",
)

@staticmethod
def readonly_postgres_instance_managed_grpc_env():
return GraphQLContextVariant(
Expand All @@ -643,6 +697,14 @@ def readonly_in_memory_instance_multi_location():
test_id="readonly_in_memory_instance_multi_location",
)

@staticmethod
def readonly_in_memory_instance_lazy_repository():
return GraphQLContextVariant(
InstanceManagers.readonly_in_memory_instance(),
EnvironmentManagers.lazy_repository(),
test_id="readonly_in_memory_instance_lazy_repository",
)

@staticmethod
def readonly_in_memory_instance_managed_grpc_env():
return GraphQLContextVariant(
Expand Down Expand Up @@ -681,13 +743,16 @@ def all_variants():
GraphQLContextVariant.readonly_in_memory_instance_in_process_env(),
GraphQLContextVariant.readonly_in_memory_instance_multi_location(),
GraphQLContextVariant.readonly_in_memory_instance_managed_grpc_env(),
GraphQLContextVariant.readonly_in_memory_instance_lazy_repository(),
GraphQLContextVariant.readonly_sqlite_instance_in_process_env(),
GraphQLContextVariant.readonly_sqlite_instance_multi_location(),
GraphQLContextVariant.readonly_sqlite_instance_managed_grpc_env(),
GraphQLContextVariant.readonly_sqlite_instance_deployed_grpc_env(),
GraphQLContextVariant.readonly_sqlite_instance_lazy_repository(),
GraphQLContextVariant.readonly_postgres_instance_in_process_env(),
GraphQLContextVariant.readonly_postgres_instance_multi_location(),
GraphQLContextVariant.readonly_postgres_instance_managed_grpc_env(),
GraphQLContextVariant.readonly_postgres_instance_lazy_repository(),
GraphQLContextVariant.consolidated_sqlite_instance_in_process_env(),
]

Expand All @@ -714,6 +779,10 @@ def all_readonly_variants():
"""
return _variants_with_mark(GraphQLContextVariant.all_variants(), pytest.mark.readonly)

@staticmethod
def all_multi_location_variants():
return _variants_with_mark(GraphQLContextVariant.all_variants(), pytest.mark.multi_location)


def _variants_with_mark(variants, mark):
def _yield_all():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,46 +1115,57 @@ def empty_repo():
return []


def define_pipelines():
return [
composites_pipeline,
csv_hello_world_df_input,
csv_hello_world_two,
csv_hello_world_with_expectations,
csv_hello_world,
eventually_successful,
hard_failer,
hello_world_with_tags,
infinite_loop_pipeline,
materialization_pipeline,
more_complicated_config,
more_complicated_nested_config,
multi_asset_pipeline,
multi_mode_with_loggers,
multi_mode_with_resources,
naughty_programmer_pipeline,
no_config_chain_pipeline,
no_config_pipeline,
noop_pipeline,
partitioned_asset_pipeline,
pipeline_with_enum_config,
pipeline_with_expectations,
pipeline_with_invalid_definition_error,
pipeline_with_list,
required_resource_pipeline,
retry_multi_input_early_terminate_pipeline,
retry_multi_output_pipeline,
retry_resource_pipeline,
scalar_output_pipeline,
single_asset_pipeline,
spew_pipeline,
tagged_pipeline,
chained_failure_pipeline,
dynamic_pipeline,
]


@repository
def test_repo():
return (
[
composites_pipeline,
csv_hello_world_df_input,
csv_hello_world_two,
csv_hello_world_with_expectations,
csv_hello_world,
eventually_successful,
hard_failer,
hello_world_with_tags,
infinite_loop_pipeline,
materialization_pipeline,
more_complicated_config,
more_complicated_nested_config,
multi_asset_pipeline,
multi_mode_with_loggers,
multi_mode_with_resources,
naughty_programmer_pipeline,
no_config_chain_pipeline,
no_config_pipeline,
noop_pipeline,
partitioned_asset_pipeline,
pipeline_with_enum_config,
pipeline_with_expectations,
pipeline_with_invalid_definition_error,
pipeline_with_list,
required_resource_pipeline,
retry_multi_input_early_terminate_pipeline,
retry_multi_output_pipeline,
retry_resource_pipeline,
scalar_output_pipeline,
single_asset_pipeline,
spew_pipeline,
tagged_pipeline,
chained_failure_pipeline,
dynamic_pipeline,
]
+ define_schedules()
+ define_sensors()
+ define_partitions()
)
return define_pipelines() + define_schedules() + define_sensors() + define_partitions()


@repository
def test_dict_repo():
return {
"pipelines": {pipeline.name: pipeline for pipeline in define_pipelines()},
"schedules": {schedule.name: schedule for schedule in define_schedules()},
"sensors": {sensor.name: sensor for sensor in define_sensors()},
"partition_sets": {
partition_set.name: partition_set for partition_set in define_partitions()
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ def test_readonly_marks_filter():
var.test_id
for var in [
GraphQLContextVariant.readonly_in_memory_instance_in_process_env(),
GraphQLContextVariant.readonly_in_memory_instance_lazy_repository(),
GraphQLContextVariant.readonly_in_memory_instance_multi_location(),
GraphQLContextVariant.readonly_in_memory_instance_managed_grpc_env(),
GraphQLContextVariant.readonly_sqlite_instance_in_process_env(),
GraphQLContextVariant.readonly_sqlite_instance_lazy_repository(),
GraphQLContextVariant.readonly_sqlite_instance_multi_location(),
GraphQLContextVariant.readonly_sqlite_instance_managed_grpc_env(),
GraphQLContextVariant.readonly_sqlite_instance_deployed_grpc_env(),
GraphQLContextVariant.readonly_postgres_instance_in_process_env(),
GraphQLContextVariant.readonly_postgres_instance_lazy_repository(),
GraphQLContextVariant.readonly_postgres_instance_multi_location(),
GraphQLContextVariant.readonly_postgres_instance_managed_grpc_env(),
]
Expand Down
Loading

0 comments on commit 33ac358

Please sign in to comment.