diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index dc51aad2a7..3e493bb4ff 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1605,6 +1605,10 @@ def plan_builder( backfill_models = None models_override: t.Optional[UniqueKeyDict[str, Model]] = None + # FQNs of models that are selected for deletion (present in the deployed environment but + # absent from local files). These are models whose files have been deleted; they will + # appear in context_diff.removed_snapshots rather than as backfill candidates. + selected_deletion_fqns: t.Set[str] = set() if select_models: try: models_override = model_selector.select_models( @@ -1622,12 +1626,24 @@ def plan_builder( # Only backfill selected models unless explicitly specified. backfill_models = model_selector.expand_model_selections(select_models) + if not backfill_models: + # The selection matched nothing locally. Check whether it matched models that exist + # in the deployed environment but have been deleted locally. If so, the selection is + # valid — the deletions will surface in context_diff.removed_snapshots. + env_selected = model_selector.expand_model_selections_with_env( + select_models, + environment, + fallback_env_name=create_from or c.PROD, + ensure_finalized_snapshots=self.config.plan.use_finalized_state, + ) + selected_deletion_fqns = env_selected - set(self._models) + expanded_restate_models = None if restate_models is not None: expanded_restate_models = model_selector.expand_model_selections(restate_models) if (restate_models is not None and not expanded_restate_models) or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ): raise PlanError( "Selector did not return any models. Please check your model selection and try again." @@ -1636,7 +1652,7 @@ def plan_builder( if always_include_local_changes is None: # default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes force_no_diff = restate_models is not None or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ) else: force_no_diff = not always_include_local_changes diff --git a/sqlmesh/core/selector.py b/sqlmesh/core/selector.py index 9eaf4995c8..f8c9fb6506 100644 --- a/sqlmesh/core/selector.py +++ b/sqlmesh/core/selector.py @@ -78,27 +78,9 @@ def select_models( Returns: A dictionary of models. """ - target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) - if target_env and target_env.expired: - target_env = None - - if not target_env and fallback_env_name: - target_env = self._state_reader.get_environment( - Environment.sanitize_name(fallback_env_name) - ) - - env_models: t.Dict[str, Model] = {} - if target_env: - environment_snapshot_infos = ( - target_env.snapshots - if not ensure_finalized_snapshots - else target_env.finalized_or_current_snapshots - ) - env_models = { - s.name: s.model - for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() - if s.is_model - } + env_models = self._load_env_models( + target_env_name, fallback_env_name, ensure_finalized_snapshots + ) all_selected_models = self.expand_model_selections( model_selections, models={**env_models, **self._models} @@ -168,6 +150,65 @@ def get_model(fqn: str) -> t.Optional[Model]: return models + def expand_model_selections_with_env( + self, + model_selections: t.Iterable[str], + target_env_name: str, + fallback_env_name: t.Optional[str] = None, + ensure_finalized_snapshots: bool = False, + ) -> t.Set[str]: + """Expands model selections against both local models and the target environment. + + This allows selections to match models that have been deleted locally but still + exist in the deployed environment. + + Args: + model_selections: A set of selections. + target_env_name: The name of the target environment. + fallback_env_name: The name of the fallback environment that will be used if the target + environment doesn't exist. + ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized + environment state, or to use whatever snapshots are in the current environment state even if + the environment is not finalized. + + Returns: + A set of matched model FQNs. + """ + env_models = self._load_env_models( + target_env_name, fallback_env_name, ensure_finalized_snapshots + ) + return self.expand_model_selections(model_selections, models={**env_models, **self._models}) + + def _load_env_models( + self, + target_env_name: str, + fallback_env_name: t.Optional[str] = None, + ensure_finalized_snapshots: bool = False, + ) -> t.Dict[str, "Model"]: + """Loads models from the target environment, falling back to the fallback environment if needed.""" + target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) + if target_env and target_env.expired: + target_env = None + + if not target_env and fallback_env_name: + target_env = self._state_reader.get_environment( + Environment.sanitize_name(fallback_env_name) + ) + + if not target_env: + return {} + + environment_snapshot_infos = ( + target_env.snapshots + if not ensure_finalized_snapshots + else target_env.finalized_or_current_snapshots + ) + return { + s.name: s.model + for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() + if s.is_model + } + def expand_model_selections( self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None ) -> t.Set[str]: diff --git a/tests/core/test_context.py b/tests/core/test_context.py index c3d88e205e..50e5f656a7 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None: sushi_context.plan("prod", restate_models=["*missing*"]) +def test_plan_select_model_deleted_model(sushi_context: Context) -> None: + """Selecting a model that has been deleted locally but still exists in the deployed + environment should produce a valid plan with the deletion, not raise PlanError.""" + # Pick a leaf model that can be safely deleted without breaking other models' rendering. + model_name = "sushi.top_waiters" + snapshot = sushi_context.get_snapshot(model_name) + assert snapshot is not None + + # Delete the model file from disk. + model = sushi_context.get_model(model_name) + assert model._path.exists() + model._path.unlink() + + # Reload the context so it no longer knows about the deleted model. + sushi_context.load() + assert model_name not in [m for m in sushi_context.models] + + # Planning with select_models for the deleted model should succeed (not raise PlanError). + plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True) + assert plan is not None + + # The deleted model should appear in removed_snapshots. + removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()} + assert snapshot.name in removed_names + + +def test_plan_select_model_deleted_model_still_rejects_nonexistent( + sushi_context: Context, +) -> None: + """A model that neither exists locally nor in the deployed environment should still + raise PlanError.""" + with pytest.raises( + PlanError, + match="Selector did not return any models. Please check your model selection and try again.", + ): + sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"]) + + def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path): models_dir = pathlib.Path("models") macros_dir = pathlib.Path("macros") diff --git a/tests/core/test_selector_native.py b/tests/core/test_selector_native.py index 5889efadda..8c4886791e 100644 --- a/tests/core/test_selector_native.py +++ b/tests/core/test_selector_native.py @@ -801,6 +801,133 @@ def test_select_models_local_tags_take_precedence_over_remote( ) +def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should include models from the deployed environment, + even if they have been deleted locally.""" + local_model = SqlModel( + name="db.local_model", + query=d.parse_one("SELECT 1 AS a"), + ) + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 2 AS b"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + env_name = "test_env" + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = Environment( + name=env_name, + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + local_models[local_model.fqn] = local_model + + selector = NativeSelector(state_reader_mock, local_models) + + # Expanding against local models only should NOT find the deleted model. + assert selector.expand_model_selections(["db.deleted_model"]) == set() + + # Expanding with env should find the deleted model. + result = selector.expand_model_selections_with_env(["db.deleted_model"], env_name) + assert deleted_model.fqn in result + + # Local model should also be reachable. + result = selector.expand_model_selections_with_env(["db.local_model"], env_name) + assert local_model.fqn in result + + # Selecting both should return both. + result = selector.expand_model_selections_with_env( + ["db.deleted_model", "db.local_model"], env_name + ) + assert result == {deleted_model.fqn, local_model.fqn} + + # Wildcard should match env models too. + result = selector.expand_model_selections_with_env(["*_model"], env_name) + assert result == {deleted_model.fqn, local_model.fqn} + + # Non-existent model should return empty. + result = selector.expand_model_selections_with_env(["db.nonexistent"], env_name) + assert result == set() + + +def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should fall back to the fallback environment.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + fallback_env = Environment( + name="prod", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.side_effect = ( + lambda name: fallback_env if name == "prod" else None + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + result = selector.expand_model_selections_with_env( + ["db.deleted_model"], "missing_env", fallback_env_name="prod" + ) + assert deleted_model.fqn in result + + +def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should ignore expired environments.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + expired_env = Environment( + name="test_env", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + expiration_ts=now_timestamp() - 1, + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = expired_env + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + result = selector.expand_model_selections_with_env(["db.deleted_model"], "test_env") + assert result == set() + + def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None: assert set(actual) == set(expected) for name, model in actual.items():