Skip to content

Support some of the DDLs for schema-based-sharding from any node#8523

Open
onurctirtir wants to merge 36 commits intorelease-14.0from
release-14.0-ddl-mx-improvements
Open

Support some of the DDLs for schema-based-sharding from any node#8523
onurctirtir wants to merge 36 commits intorelease-14.0from
release-14.0-ddl-mx-improvements

Conversation

@onurctirtir
Copy link
Copy Markdown
Member

@onurctirtir onurctirtir commented Mar 17, 2026

Note that all the commits are reviewed and first merged into https://github.com/citusdata/citus/tree/feature/ddl-from-any-node-schema-based-sharding, from #8461, #8464, #8478, #8486 and #8506, except:

  • The first commit, which bumps the Citus version to 14.1.0.
  • The last one, which fixups upgrade / downgrade paths for the Citus internal UDFs added / modified.

Once this PR passes all mixed-version cluster tests and is merged to "release-14.0", I'll create another PR to merge all the changes to "main" branch as well.

One can also check individual commits / commit messages to better understand the changes.


DESCRIPTION: Support some of the DDLs for schema-based-sharding from any node.

What's supported in general?

Add support for the following from any node in general, regardless of whether
the command targets a distributed-schema / table or not:

  • create schema
  • drop schema
  • alter schema rename to / owner to
  • create / drop / alter view

Note that we have already been supporting the following:

  • create / drop / alter role
  • truncate

What's supported for distributed-schemas / tables?

Add support for the following from any node when the command targets a
distributed-schema / table:

  • "create table" variants, including, e.g., sequence-based columns, initial
    foreign keys, partitioning relationship and many other attributes that can be
    defined at "create table" time.
  • drop table
  • alter table, including but not limited to:
    • "alter table set schema" to move a Postgres / Citus managed local table
      into a distributed-schema from any node
    • "alter table attach / detach partition" to manage partitioning
      relationships between distributed-schema tables within the same schema
    • "alter table add / drop / set column" ..
  • create / drop / alter index
  • create / drop / alter trigger

Also remove the limitations that were inherited from regular distributed tables
around the usage of triggers with distributed-schema tables, i.e., do not
require citus.enable_unsafe_triggers for distributed-schema tables anymore as
it's the case for Citus managed local tables.

Important changes at the high-level

  • Now we call EnsurePropagationToCoordinator() instead of EnsureCoordinator() in
    relevant code-paths to support a DDL from any node.
  • Instead of SendCommandToWorkersWithMetadata(), now we use
    SendCommandToRemoteNodesWithMetadata() in the code-paths where we need to send
    a command to other nodes, e.g., to sync metadata changes when creating a
    distributed-schema table from a worker.
  • When creating a new distributed-schema / distributed-schema table, we need to
    fetch the next colocation id, shard id and shard placement id. With this PR,
    whenever we need to consume one of those sequences, we fetch the next sequence
    value from the coordinator and perform rest of the operations on the current
    node. This is because, we've never synced those sequences to workers, so we
    consider their coordinator versions as source of truth.
  • Preserve sequence range, nextval, and initial value behavior when creating /
    altering distributed-schema tables from workers.
  • Make sure to acquire central locks on the coordinator even when an operation
    is initiated from a worker.
  • We also properly support replicating reference tables from workers when
    creating a distributed-schema table from workers.

For the last three points, please also see the next sections for more details.

Significant changes for sequences

In Postgres, a column can be associated with a sequence in three ways:

  • Explicit default using nextval('sequence_name')
  • Serial pseudo-types (smallserial, serial, bigserial)
  • Identity columns (GENERATED ... AS IDENTITY)

Such columns can be used in Citus tables in three scenarios:

  • The column existed before the table was distributed
  • The column was added after distribution
  • The column was altered to use a sequence after distribution

When Citus tables are created or altered on the coordinator, we enforce the
following:

  • The coordinator is always allowed to use the full sequence range.
  • If the sequence is bigint-based, the allowed sequence range is split across
    workers based on node group ids.
    Otherwise, workers are effectively prevented from using the sequence by
    setting current value to the maximum value.
    See AlterSequenceMinMax().
  • Also, for columns using explicit nextval defaults, worker-side defaults are
    rewritten to worker_nextval() to provide a clearer error when invoked on
    workers.
    For serial and identity columns, the default expression cannot be rewritten,
    but the same sequence range restrictions still apply.
  • Furthermore, since the sequence originally present on the coordinator, its
    last_value and is_called state are implicitly preserved on the coordinator
    after it's distributed.

And we want to preserve the same behavior when distributed-schema tables are
created or altered from workers, hence the code changes introduced in this PR:

  • We make sure to always use nextval() on the coordinator when the column has a
    default expression that uses nextval(), regardless of the data type.
  • We make sure to execute AlterSequenceMinMax() on the local worker node in
    addition to remote workers.
    That way, we can use the appropriate sequence range on the local worker as
    well if it's bigint-based, or we can prevent the local worker from using the
    sequence otherwise, by setting the current value to the maximum value.
  • We also sync the sequence's last_value and is_called state to the
    coordinator's sequence.
    That way, we can continue using the full range starting from the worker's
    current value on the coordinator after distribution, as it's the case when
    creating Citus tables from the coordinator.

Finally, the UDFs that we send to other nodes to implement some parts of these
changes now need to know whether it's called on the coordinator or a worker,
namely worker_apply_sequence_command() and
citus_internal.adjust_identity_column_seq_settings(). For these UDFs to properly
check whether it's called on the coordinator or a worker, we always need to have
the node group id to be already assigned.
For coordinator and metadata-synced workers, this is always the case. Note that,
by default, citus_add_node() always syncs metadata to the workers when adding
them into the metadata. However, although not recommended, user can later
execute stop_metadata_sync_to_node() for a worker to stop metadata syncing to
that worker.

In that case, before this PR, we were resetting the node group id to 0 for
workers with metadata sync stopped, where, 0 is assumed to be the coordinator's
node group id almost always in the code-base. So with this PR, we defer
resetting the group id until the worker is actually removed from the metadata.

Ideally, we should not be sending commands to create / alter sequences to the
workers without a metadata because we never use sequences in shard-level queries
and DMLs, but when querying shell tables. However, changing this would require
more work and deferring resetting the node group id until the worker is removed
from the metadata anyways seems like a correct thing to do.

Also, to preserve compatibility in mixed-version clusters, we keep the older
variant of worker_apply_sequence_command(), i.e., the one that takes 2
parameters rather than 4, and worker_adjust_identity_column_seq_ranges(), which
is the ancestor of citus_internal.adjust_identity_column_seq_settings(). And
when creating or altering a distributed table from the coordinator, we keep
sending these older variants to the workers. This is because, the most
significant difference between using the older variants and the newer variants
is that the newer variants also send last_value and is_called state of the
sequence to the node that they're sent to. However, as we don't actually need to
do that when creating or altering a distributed table from the coordinator, and
since using them from the coordinator could cause failures in mixed-version
clusters, e.g., when one of the workers is still on the older version of Citus,
we keep using the older variants when creating or altering a distributed table
from the coordinator.

Significant changes around Citus locks

Now we acquire the following locks on the coordinator when creating a
distributed-schema table from a worker:

  • advisory colocation id lock
  • advisory placement colocation lock
  • pg_dist_node table lock

This is because, on several operations that we can only initiate from the
coordinator today, such as shard-moves, we rely on those locks to be acquired
on the coordinator if a concurrent operation that could interfere with us was
already initiated.

Note that in the scope of these locks, in code-paths that we're sure that the
operation cannot be initiated from a worker, we don't even check whether we're
on a worker etc., so we always just acquire the lock locally. In other words, we
check "if we need to remotely acquire those locks on the coordinator" only in
the code-paths that can initiate an operation from a worker, e.g., operations
around distributed-schemas / tables that we've added support for initiating from
a worker.

Note that we don't care about "colocation default lock". This is because, even
if it appears, e.g., in CreateCitusTable(), it's never called when creating a
distributed-schema table by definition.

Support implicitly replicating reference tables from workers

We also support for replicating reference tables to all nodes when creating a
distributed-schema table from a worker, when the reference tables are not yet
replicated to all nodes. To do that, as well as acquiring necessary locks on the
coordinator as mentioned above, we make sure to connect to the coordinator
and initiate shard-copy operations from there.

Future improvements

  • Implementation of the mechanisms to avoid "avoidable-distributed-deadlocks".
  • Supporting other DDLs / UDFs around distributed-schemas / tables and related
    objects from any node:
    • "create /alter / drop etc." of the independent objects like types and
      functions that the users might use with distributed-schema tables are not
      supported.
    • "create /alter / drop etc." statistics / policies / rule after table
      creation are not supported.
      • Although we allow creating tables having such initial objects /
        configs at create table time and altering them later if / as much as
        alter table allows.
    • Limitations that we had for some "alter table" subcommands are also
      inherited.
    • "alter table set schema" to move a distributed-schema table into a regular
      or another distributed-schema is not supported.
      • This is because this will require supporting underlying functionality of
        undistribute_table() from workers.
    • distributed-schema management UDFs like citus_schema_distribute /
      citus_schema_undistribute are not supported.
    • Any other DDL / UDFs that's not listed here are not yet supported too.

…ate distributed-schema tables

- create table
- create table as
- create table partition of
- create table partitioned by
Even before, run_command_on_master_and_workers_temp() UDF that we
were using to create those views was silently failing when creating
the views on workers. This is because, "EXECUTE p_sql;" on the
coordinator was already propagating the command to the workers.
However, we were not checking "PERFORM run_command_on_workers(p_sql);"
for errors, so any failure there was ignored.
.. not when we stop syncing metadata to it.

Resetting local group id means setting it 0, which is same as setting it
to COORDINATOR_GROUP_ID. But when we stop syncing metadata to a worker, we
don't want it to assume it's the coordinator until we actually remove it
as it's still part of this cluster and can be re-added later. This mostly
doesn't cause an issue because we distinguish between the commands to be
sent to worker nodes with metadata vs without metadata, so we mostly don't
send a command to a worker without metadata if the commands has to check,
e.g., if the node is coordinator or not.

However, it seems that in some cases we don't make such a distinction.
For example, we create sequences that a table depends on due to having a
serial column or using a column that defaults to nextval('my_sequence')
on all remote nodes, including the ones that we marked as
"hasmetadata = false", when creating prerequisite objects using
PropagatePrerequisiteObjectsForDistributedTable() before creating shards.
This actually doesn't make much sense because a non-MX worker doesn't need
sequences as we always resolve nextval() calls on coordinator
and MX workers before preparing shard-level queries, so actually we never
need sequences when executing shard-level queries on non-MX workers. And
with next commit, worker_apply_sequence_command() will need to check if
the node it's sent to is a coordinator or not during execution and doing
so would be problematic if we don't reset local group id when stopping
metadata sync to a worker, as such a worker would then assume it's a
coordinator and would try to execute coordinator specific logic in
worker_apply_sequence_command() too.
…buted-schema tables from workers

A further refactor should ideally make the rest of the code-base be using
SetLocalEnableDDLPropagation() as well.

Also, another further refactor should ideally get rid of extra args of
AlterSequenceMinMax().

Also, ideally, sequence-processing related parts in PostprocessAlterTableStmt
should be moved into a seperate function etc.
We never meant to support this UDF from workers because
EnsureReferenceTablesExistOnAllNodesExtended() always assumed
that it's run from the coordinator, which might change very
soon but still we wouldn't want to support
replicate_reference_tables() from workers for scoping purposes.
During the tests where we create distributed-schema tables from a
worker node, altering citus.next_shard_id / citus.next_placement_id
using run_command_on_coordinator() doesn't guarantee that we'll use the
values we set them to, e.g., if the citus internal connection that we
use to connect to the coordinator changes. For this reason, let's always
directly alter pg_dist_shardid_seq / pg_dist_placement_placementid_seq
at the system level by connecting to the coordinator.
…un from a worker

Given the recent support added to create distributed-schema tables from
any node; in the code-paths that can now acquire a colocation id lock
from a worker, make sure to acquire the lock on the coordinator, maybe
by using a remote connection that won't be closed until the end of the
transaction.

EnsureReferenceTablesExistOnAllNodesExtended() is also one of those
code-paths, but normally it needs to release the lock at certain points,
so it'll be handled in a separate commit differently. The code-paths
that are handled in this commit are the ones that don't need to release
the lock until the end of the transaction.
…n from a worker

Given the support for some operations from any node any node; in the
code-paths that can acquire a lock pg_dist_node from a worker, make
sure to acquire the lock on the coordinator as well, maybe by using
a remote connection that won't be closed until the end of the
transaction.

EnsureReferenceTablesExistOnAllNodesExtended() is also one of those
code-paths but it'll be handled in a separate commit.
…on is run from a worker

Given the recent support added to create distributed-schema tables from
any node; in the code-paths that can now acquire a placement colocation
lock from a worker, make sure to acquire the lock on the coordinator,
maybe by using a remote connection that won't be closed until the end of
the transaction.
…ing a distributed-schema table from a worker

Acquire a transactional advisory lock and release it by commiting the
remote transaction, because, in Postgres there is no other way to
release a transcational advisory lock in a separate command.
…mand via session-level conn

.. during isolation tests.

See the code comments.
…node

- Add tests for reference table replication as well as
  a failure test for #6592.
- Make sure to test create / drop schema / table for non-super
  user in schema_based_sharding_from_workers_a.sql.
- Add more non-super user tests.
- Add tests for the cases when the relation name is not qualified.
- Add tests for schema / table names that need proper quoting.
- Add more tests for foreign key constraints.
- Add isolation tests.
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 17, 2026

Codecov Report

❌ Patch coverage is 88.02721% with 88 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.74%. Comparing base (fe25f68) to head (e0b28e4).

Additional details and impacted files
@@               Coverage Diff                @@
##           release-14.0    #8523      +/-   ##
================================================
+ Coverage         88.72%   88.74%   +0.02%     
================================================
  Files               287      287              
  Lines             63266    63802     +536     
  Branches           7931     8009      +78     
================================================
+ Hits              56133    56622     +489     
- Misses             4864     4877      +13     
- Partials           2269     2303      +34     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant