Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,10 @@ def plan_builder(
backfill_models = None

models_override: t.Optional[UniqueKeyDict[str, Model]] = None
# FQNs of models that are selected for deletion (present in the deployed environment but
# absent from local files). These are models whose files have been deleted; they will
# appear in context_diff.removed_snapshots rather than as backfill candidates.
selected_deletion_fqns: t.Set[str] = set()
if select_models:
try:
models_override = model_selector.select_models(
Expand All @@ -1622,12 +1626,24 @@ def plan_builder(
# Only backfill selected models unless explicitly specified.
backfill_models = model_selector.expand_model_selections(select_models)

if not backfill_models:
# The selection matched nothing locally. Check whether it matched models that exist
# in the deployed environment but have been deleted locally. If so, the selection is
# valid — the deletions will surface in context_diff.removed_snapshots.
env_selected = model_selector.expand_model_selections_with_env(
select_models,
environment,
fallback_env_name=create_from or c.PROD,
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
)
selected_deletion_fqns = env_selected - set(self._models)

expanded_restate_models = None
if restate_models is not None:
expanded_restate_models = model_selector.expand_model_selections(restate_models)

if (restate_models is not None and not expanded_restate_models) or (
backfill_models is not None and not backfill_models
backfill_models is not None and not backfill_models and not selected_deletion_fqns
):
raise PlanError(
"Selector did not return any models. Please check your model selection and try again."
Expand All @@ -1636,7 +1652,7 @@ def plan_builder(
if always_include_local_changes is None:
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
force_no_diff = restate_models is not None or (
backfill_models is not None and not backfill_models
backfill_models is not None and not backfill_models and not selected_deletion_fqns
)
else:
force_no_diff = not always_include_local_changes
Expand Down
83 changes: 62 additions & 21 deletions sqlmesh/core/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,9 @@ def select_models(
Returns:
A dictionary of models.
"""
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
if target_env and target_env.expired:
target_env = None

if not target_env and fallback_env_name:
target_env = self._state_reader.get_environment(
Environment.sanitize_name(fallback_env_name)
)

env_models: t.Dict[str, Model] = {}
if target_env:
environment_snapshot_infos = (
target_env.snapshots
if not ensure_finalized_snapshots
else target_env.finalized_or_current_snapshots
)
env_models = {
s.name: s.model
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
if s.is_model
}
env_models = self._load_env_models(
target_env_name, fallback_env_name, ensure_finalized_snapshots
)

all_selected_models = self.expand_model_selections(
model_selections, models={**env_models, **self._models}
Expand Down Expand Up @@ -168,6 +150,65 @@ def get_model(fqn: str) -> t.Optional[Model]:

return models

def expand_model_selections_with_env(
self,
model_selections: t.Iterable[str],
target_env_name: str,
fallback_env_name: t.Optional[str] = None,
ensure_finalized_snapshots: bool = False,
) -> t.Set[str]:
"""Expands model selections against both local models and the target environment.

This allows selections to match models that have been deleted locally but still
exist in the deployed environment.

Args:
model_selections: A set of selections.
target_env_name: The name of the target environment.
fallback_env_name: The name of the fallback environment that will be used if the target
environment doesn't exist.
ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized
environment state, or to use whatever snapshots are in the current environment state even if
the environment is not finalized.

Returns:
A set of matched model FQNs.
"""
env_models = self._load_env_models(
target_env_name, fallback_env_name, ensure_finalized_snapshots
)
return self.expand_model_selections(model_selections, models={**env_models, **self._models})

def _load_env_models(
self,
target_env_name: str,
fallback_env_name: t.Optional[str] = None,
ensure_finalized_snapshots: bool = False,
) -> t.Dict[str, "Model"]:
"""Loads models from the target environment, falling back to the fallback environment if needed."""
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
if target_env and target_env.expired:
target_env = None

if not target_env and fallback_env_name:
target_env = self._state_reader.get_environment(
Environment.sanitize_name(fallback_env_name)
)

if not target_env:
return {}

environment_snapshot_infos = (
target_env.snapshots
if not ensure_finalized_snapshots
else target_env.finalized_or_current_snapshots
)
return {
s.name: s.model
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
if s.is_model
}

def expand_model_selections(
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
) -> t.Set[str]:
Expand Down
38 changes: 38 additions & 0 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None:
sushi_context.plan("prod", restate_models=["*missing*"])


def test_plan_select_model_deleted_model(sushi_context: Context) -> None:
"""Selecting a model that has been deleted locally but still exists in the deployed
environment should produce a valid plan with the deletion, not raise PlanError."""
# Pick a leaf model that can be safely deleted without breaking other models' rendering.
model_name = "sushi.top_waiters"
snapshot = sushi_context.get_snapshot(model_name)
assert snapshot is not None

# Delete the model file from disk.
model = sushi_context.get_model(model_name)
assert model._path.exists()
model._path.unlink()

# Reload the context so it no longer knows about the deleted model.
sushi_context.load()
assert model_name not in [m for m in sushi_context.models]

# Planning with select_models for the deleted model should succeed (not raise PlanError).
plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True)
assert plan is not None

# The deleted model should appear in removed_snapshots.
removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()}
assert snapshot.name in removed_names


def test_plan_select_model_deleted_model_still_rejects_nonexistent(
sushi_context: Context,
) -> None:
"""A model that neither exists locally nor in the deployed environment should still
raise PlanError."""
with pytest.raises(
PlanError,
match="Selector did not return any models. Please check your model selection and try again.",
):
sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"])


def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path):
models_dir = pathlib.Path("models")
macros_dir = pathlib.Path("macros")
Expand Down
127 changes: 127 additions & 0 deletions tests/core/test_selector_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,133 @@ def test_select_models_local_tags_take_precedence_over_remote(
)


def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot):
"""expand_model_selections_with_env should include models from the deployed environment,
even if they have been deleted locally."""
local_model = SqlModel(
name="db.local_model",
query=d.parse_one("SELECT 1 AS a"),
)
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 2 AS b"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

env_name = "test_env"

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.return_value = Environment(
name=env_name,
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
)
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
local_models[local_model.fqn] = local_model

selector = NativeSelector(state_reader_mock, local_models)

# Expanding against local models only should NOT find the deleted model.
assert selector.expand_model_selections(["db.deleted_model"]) == set()

# Expanding with env should find the deleted model.
result = selector.expand_model_selections_with_env(["db.deleted_model"], env_name)
assert deleted_model.fqn in result

# Local model should also be reachable.
result = selector.expand_model_selections_with_env(["db.local_model"], env_name)
assert local_model.fqn in result

# Selecting both should return both.
result = selector.expand_model_selections_with_env(
["db.deleted_model", "db.local_model"], env_name
)
assert result == {deleted_model.fqn, local_model.fqn}

# Wildcard should match env models too.
result = selector.expand_model_selections_with_env(["*_model"], env_name)
assert result == {deleted_model.fqn, local_model.fqn}

# Non-existent model should return empty.
result = selector.expand_model_selections_with_env(["db.nonexistent"], env_name)
assert result == set()


def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_snapshot):
"""expand_model_selections_with_env should fall back to the fallback environment."""
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 1 AS a"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

fallback_env = Environment(
name="prod",
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
)

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.side_effect = (
lambda name: fallback_env if name == "prod" else None
)
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
selector = NativeSelector(state_reader_mock, local_models)

result = selector.expand_model_selections_with_env(
["db.deleted_model"], "missing_env", fallback_env_name="prod"
)
assert deleted_model.fqn in result


def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_snapshot):
"""expand_model_selections_with_env should ignore expired environments."""
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 1 AS a"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

expired_env = Environment(
name="test_env",
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
expiration_ts=now_timestamp() - 1,
)

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.return_value = expired_env
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
selector = NativeSelector(state_reader_mock, local_models)

result = selector.expand_model_selections_with_env(["db.deleted_model"], "test_env")
assert result == set()


def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None:
assert set(actual) == set(expected)
for name, model in actual.items():
Expand Down