Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ jobs:

- name: Run crash recovery origin-filter test
run: go test -count=1 -v ./tests/integration -run 'TestAdvancedRepairPlan'

- name: Run connection pool cap test
run: go test -count=1 -v ./tests/integration -run 'TestRepsetDiff_MaxConnectionsCap'
1 change: 1 addition & 0 deletions ace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ table_diff:
diff_batch_size: 1
max_diff_batch_size: 1000
compare_unit_size: 10000
max_connections: 0 # max DB connections per node (0 = derive from concurrency factor)

mtree:
cdc:
Expand Down
3 changes: 3 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Compares a table between nodes and generates a diff report.
| `--output` | `-o` | Output format (`json` or `html`) | json |
| `--nodes` | `-n` | Nodes to include in the diff (comma-separated, or "all") | all |
| `--table-filter` | `-F` | `WHERE` clause expression to use while diffing tables | |
| `--max-connections` | `-M` | Maximum database connections per node | derived |
| `--override-block-size`| `-B` | Allow block sizes outside `ace.yaml` guardrails | false |
| `--quiet` | `-q` | Suppress progress output | false |
| `--debug` | `-v` | Enable debug logging | false |
Expand Down Expand Up @@ -147,6 +148,7 @@ Compares schemas across nodes in a pgEdge cluster. By default, `schema-diff` per
| `--output` | `-o` | Output format (`json` or `html`) | json |
| `--nodes` | `-n` | Nodes to include in the diff (comma-separated, or "all") | all |
| `--table-filter` | `-F` | `WHERE` clause expression to use while diffing tables | |
| `--max-connections` | `-m` | Maximum database connections per node | derived |
| `--override-block-size`| `-B` | Allow block sizes outside `ace.yaml` guardrails | false |
| `--quiet` | `-q` | Suppress progress output | false |
| `--debug` | `-v` | Enable debug logging | false |
Expand Down Expand Up @@ -189,6 +191,7 @@ Performs a `table-diff` on every table in a replication set and reports differen
| `--concurrency-factor` | `-c` | CPU ratio for concurrency (0.0–4.0) | 0.5 |
| `--compare-unit-size` | `-u` | Recursive split size for mismatched blocks | 10000 |
| `--output` | `-o` | Per-table diff output format (`json` or `html`) | json |
| `--max-connections` | `-m` | Maximum database connections per node | derived |
| `--override-block-size` | `-B` | Allow block sizes outside `ace.yaml` guardrails | false |
| `--quiet` | `-q` | Suppress output | false |
| `--debug` | `-v` | Enable debug logging | false |
Expand Down
1 change: 1 addition & 0 deletions docs/commands/diff/repset-diff.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Runs `table-diff` on every table in a replication set and reports differences.
| `--concurrency-factor <float>` | `-c` | CPU ratio for concurrency (0.0–4.0). Default `0.5`. |
| `--compare-unit-size <int>` | `-u` | Recursive split size for mismatched blocks. Default `10000`. |
| `--output <json\|html>` | `-o` | Per-table diff report format. Default `json`. |
| `--max-connections <int>` | `-M` | Maximum database connections per node. Caps the pool regardless of concurrency factor. | derived |
| `--override-block-size` | `-B` | Allow block sizes outside `ace.yaml` guardrails. |
| `--quiet` | `-q` | Suppress output | `false` |
| `--debug` | `-v` | Debug logging | `false` |
Expand Down
1 change: 1 addition & 0 deletions docs/commands/diff/table-diff.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This command compares the data in the specified table across nodes in a cluster
| `--table-filter <WHERE>` | `-F` | Optional SQL `WHERE` clause applied on every node before hashing. |
| `--against-origin <node>` | | Limit the diff to rows whose `node_origin` matches this Spock node id or name (useful for failed-node recovery). |
| `--until <timestamp>` | | Optional commit timestamp fence (RFC3339) applied with `--against-origin` and `--table-filter`; excludes newer rows. |
| `--max-connections <int>` | `-M` | Maximum database connections per node. When set, caps the connection pool regardless of concurrency factor. Default: derived from `--concurrency-factor`. |
| `--override-block-size` | `-B` | Skip block-size safety checks defined in `ace.yaml`. |
| `--quiet` | `-q` | Suppress progress output. Results still write to the diff file. |
| `--debug` | `-v` | Enable verbose logging. |
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The [`ace.yaml` file](https://github.com/pgEdge/ace/blob/main/ace.yaml) defines
| table_diff --> min_diff_block_size | Minimum diff block (row chunk) size. **Default: 1** |
| table_diff --> max_diff_block_size | Maximum diff block size. **Default: 1000000** |
| table_diff --> compare_unit_size | Unit size for smallest comparison chunk. **Default: 10000** |
| table_diff --> max_connections | Maximum database connections per node during diff operations. When set, caps the connection pool regardless of concurrency factor. **Default: 0** (derive from concurrency factor) |
| mtree → cdc --> slot_name | Logical decoding slot name for mtree CDC. **Default: "ace_mtree_slot"** |
| mtree → cdc --> publication_name | Publication used for mtree CDC. **Default: "ace_mtree_pub"** |
| mtree → cdc --> cdc_processing_timeout | CDC processing timeout (s). **Default: 30** |
Expand Down
3 changes: 2 additions & 1 deletion docs/design/table_diff.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ flowchart LR
### Resource Utilisation and Tuning

- **block_size**: Larger blocks reduce hash tasks and recursion but increase memory/IO per hash and slow mismatch localisation; smaller blocks do the opposite (more queries, finer locality).
- **concurrency_factor**: CPU ratio (0.0–4.0) that scales workers relative to `NumCPU` (e.g. 0.5 on a 16-CPU host spawns 8 workers). Higher = faster hashing but more load on DB backends, network, and local CPU; can contend with other workloads and connection limits.
- **concurrency_factor**: CPU ratio (0.0–4.0) that scales workers relative to `NumCPU` (e.g. 0.5 on a 16-CPU host spawns 8 workers). Higher = faster hashing but more load on DB backends, network, and local CPU; can contend with other workloads and connection limits. The connection pool per node is sized to match the worker count (minimum 4).
- **max_connections**: Hard cap on the connection pool size per node. When set, overrides the concurrency-derived pool size. Useful for environments with limited `max_connections` on the database server. Workers that exceed the pool size will queue for a connection rather than fail.
- **compare_unit_size**: Lower values push recursion deeper (more queries, smaller fetches); higher values stop earlier (fewer queries, larger fetches on mismatched ranges).
- **max_diff_rows**: Early-exit guardrail. Lower caps keep runs short and reports small on divergent tables; raising/removing can grow memory and report size when drift is large.
- **table_filter**: Narrows scope and cost; enables accurate `COUNT(*)` on the filtered view. Must be identical across nodes to avoid false positives.
Expand Down
3 changes: 3 additions & 0 deletions docs/http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Request body:
| `concurrency_factor` | float | no | CPU ratio (0.0–4.0). Defaults to `table_diff.concurrency_factor` or `0.5`. |
| `compare_unit_size` | int | no | Defaults to `table_diff.compare_unit_size` or `10000`. |
| `max_diff_rows` | int64 | no | Defaults to `table_diff.max_diff_rows` or `0` (no limit). |
| `max_connections` | int | no | Max DB connections per node. Defaults to `table_diff.max_connections` or derived from concurrency factor. |
| `table_filter` | string | no | SQL `WHERE` predicate (without `WHERE`). |
| `override_block_size` | bool | no | Bypass block-size guardrails. |
| `quiet` | bool | no | Suppress progress output. |
Expand Down Expand Up @@ -192,6 +193,7 @@ Request body:
| `block_size` | int | no | Defaults to `table_diff.diff_block_size` or `100000`. |
| `concurrency_factor` | float | no | CPU ratio (0.0–4.0). Defaults to `table_diff.concurrency_factor` or `0.5`. |
| `compare_unit_size` | int | no | Defaults to `table_diff.compare_unit_size` or `10000`. |
| `max_connections` | int | no | Max DB connections per node. Defaults to `table_diff.max_connections` or derived from concurrency factor. |
| `output` | string | no | `json` (default) or `html`. |
| `override_block_size` | bool | no | Bypass block-size guardrails. |
| `quiet` | bool | no | Suppress output. |
Expand All @@ -213,6 +215,7 @@ Request body:
| `block_size` | int | no | Defaults to `table_diff.diff_block_size` or `100000`. |
| `concurrency_factor` | float | no | CPU ratio (0.0–4.0). Defaults to `table_diff.concurrency_factor` or `0.5`. |
| `compare_unit_size` | int | no | Defaults to `table_diff.compare_unit_size` or `10000`. |
| `max_connections` | int | no | Max DB connections per node. Defaults to `table_diff.max_connections` or derived from concurrency factor. |
| `output` | string | no | `json` (default) or `html`. |
| `override_block_size` | bool | no | Bypass block-size guardrails. |
| `quiet` | bool | no | Suppress output. |
Expand Down
9 changes: 9 additions & 0 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ components:
max_diff_rows:
type: integer
description: Defaults to table_diff.max_diff_rows or 0 (no limit).
max_connections:
type: integer
description: Maximum database connections per node. Defaults to table_diff.max_connections or derived from concurrency factor.
table_filter:
type: string
description: SQL WHERE predicate (without WHERE).
Expand Down Expand Up @@ -567,6 +570,9 @@ components:
compare_unit_size:
type: integer
description: Defaults to table_diff.compare_unit_size or 10000.
max_connections:
type: integer
description: Maximum database connections per node. Defaults to table_diff.max_connections or derived from concurrency factor.
output:
type: string
enum: [json, html]
Expand Down Expand Up @@ -601,6 +607,9 @@ components:
compare_unit_size:
type: integer
description: Defaults to table_diff.compare_unit_size or 10000.
max_connections:
type: integer
description: Maximum database connections per node. Defaults to table_diff.max_connections or derived from concurrency factor.
output:
type: string
enum: [json, html]
Expand Down
3 changes: 3 additions & 0 deletions docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ When invoking [ACE commands](commands/index.md), review the available command op
- **`--compare-unit-size`**
Sets the minimum block size used when ACE recursively drills into mismatched blocks. Smaller values provide more granular comparisons at the cost of additional round-trips. Default is `10000`.

- **`--max-connections`**
Caps the number of database connections ACE opens per node. By default, the pool size is derived from `--concurrency-factor` and the number of CPUs. On machines with many cores, this can result in a large number of connections. Use `--max-connections` to set a hard upper limit, or set `table_diff.max_connections` in `ace.yaml` to apply it globally. This is especially useful when running ACE against databases with limited `max_connections` or when sharing the database with other applications.

- **`--override-block-size` (`-B`)**
Override the safety limits defined in `ace.yaml`. Use cautiously: extremely large blocks can lead to `array_agg` memory pressure.

Expand Down
19 changes: 19 additions & 0 deletions internal/api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type tableDiffRequest struct {
Concurrency float64 `json:"concurrency_factor"`
CompareUnitSize int `json:"compare_unit_size"`
MaxDiffRows int64 `json:"max_diff_rows"`
MaxConnections int `json:"max_connections"`
TableFilter string `json:"table_filter"`
OverrideBlockSize bool `json:"override_block_size"`
Quiet bool `json:"quiet"`
Expand Down Expand Up @@ -76,6 +77,7 @@ type schemaDiffRequest struct {
BlockSize int `json:"block_size"`
Concurrency float64 `json:"concurrency_factor"`
CompareUnitSize int `json:"compare_unit_size"`
MaxConnections int `json:"max_connections"`
Output string `json:"output"`
OverrideBlockSize bool `json:"override_block_size"`
Quiet bool `json:"quiet"`
Expand All @@ -91,6 +93,7 @@ type repsetDiffRequest struct {
BlockSize int `json:"block_size"`
Concurrency float64 `json:"concurrency_factor"`
CompareUnitSize int `json:"compare_unit_size"`
MaxConnections int `json:"max_connections"`
Output string `json:"output"`
OverrideBlockSize bool `json:"override_block_size"`
Quiet bool `json:"quiet"`
Expand Down Expand Up @@ -212,6 +215,7 @@ func (s *APIServer) handleTableDiff(w http.ResponseWriter, r *http.Request) {
task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency)
task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize)
task.MaxDiffRows = s.resolveMaxDiffRows(cfg, req.MaxDiffRows)
task.MaxConnections = s.resolveMaxConnections(cfg, req.MaxConnections)
task.Output = "json"
task.Nodes = s.resolveNodes(req.Nodes)
task.TableFilter = strings.TrimSpace(req.TableFilter)
Expand Down Expand Up @@ -291,6 +295,19 @@ func (s *APIServer) resolveMaxDiffRows(cfg *config.Config, requested int64) int6
return 0
}

func (s *APIServer) resolveMaxConnections(cfg *config.Config, requested int) int {
if requested > 0 {
return requested
}
if requested < 0 {
return requested // rejected by Validate()
}
if cfg != nil && cfg.TableDiff.MaxConnections > 0 {
return cfg.TableDiff.MaxConnections
}
return 0
}

func (s *APIServer) resolveNodes(nodes []string) string {
if len(nodes) == 0 {
return "all"
Expand Down Expand Up @@ -577,6 +594,7 @@ func (s *APIServer) handleSchemaDiff(w http.ResponseWriter, r *http.Request) {
task.BlockSize = s.resolveBlockSize(cfg, req.BlockSize)
task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency)
task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize)
task.MaxConnections = s.resolveMaxConnections(cfg, req.MaxConnections)
task.Output = strings.TrimSpace(req.Output)
if task.Output == "" {
task.Output = "json"
Expand Down Expand Up @@ -659,6 +677,7 @@ func (s *APIServer) handleRepsetDiff(w http.ResponseWriter, r *http.Request) {
task.BlockSize = s.resolveBlockSize(cfg, req.BlockSize)
task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency)
task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize)
task.MaxConnections = s.resolveMaxConnections(cfg, req.MaxConnections)
task.Output = strings.TrimSpace(req.Output)
if task.Output == "" {
task.Output = "json"
Expand Down
9 changes: 9 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func SetupCLI() *cli.Command {
Usage: "Override block size",
Value: false,
},
&cli.IntFlag{
Name: "max-connections",
Aliases: []string{"M"},
Usage: "Maximum number of database connections per node (default: derived from concurrency factor)",
Value: 0,
},
}

rerunOnlyFlags := []cli.Flag{
Expand Down Expand Up @@ -879,6 +885,7 @@ func TableDiffCLI(cmd *cli.Command) error {
task.DBName = cmd.String("dbname")
task.BlockSize = int(blockSizeInt)
task.ConcurrencyFactor = cmd.Float64("concurrency-factor")
task.MaxConnections = cmd.Int("max-connections")
task.CompareUnitSize = cmd.Int("compare-unit-size")
task.Output = strings.ToLower(cmd.String("output"))
task.Nodes = cmd.String("nodes")
Expand Down Expand Up @@ -1275,6 +1282,7 @@ func SchemaDiffCLI(cmd *cli.Command) error {

task.BlockSize = int(blockSizeInt)
task.ConcurrencyFactor = cmd.Float64("concurrency-factor")
task.MaxConnections = cmd.Int("max-connections")
task.CompareUnitSize = cmd.Int("compare-unit-size")
task.Output = cmd.String("output")
task.OverrideBlockSize = cmd.Bool("override-block-size")
Expand Down Expand Up @@ -1350,6 +1358,7 @@ func RepsetDiffCLI(cmd *cli.Command) error {

task.BlockSize = int(blockSizeInt)
task.ConcurrencyFactor = cmd.Float64("concurrency-factor")
task.MaxConnections = cmd.Int("max-connections")
task.CompareUnitSize = cmd.Int("compare-unit-size")
task.Output = cmd.String("output")
task.OverrideBlockSize = cmd.Bool("override-block-size")
Expand Down
6 changes: 6 additions & 0 deletions internal/consistency/diff/repset_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type RepsetDiffCmd struct {
database types.Database
ConnectionPool *pgxpool.Pool
ConcurrencyFactor float64
MaxConnections int
BlockSize int
CompareUnitSize int
Output string
Expand Down Expand Up @@ -230,6 +231,9 @@ func (c *RepsetDiffCmd) Validate() error {
if c.RepsetName == "" {
return fmt.Errorf("repset name is required")
}
if c.MaxConnections < 0 {
return fmt.Errorf("max_connections must be >= 1 (or 0 to derive from concurrency factor)")
}
return nil
}

Expand Down Expand Up @@ -367,6 +371,7 @@ func RepsetDiff(task *RepsetDiffCmd) (err error) {
tdTask.Nodes = task.Nodes
tdTask.QualifiedTableName = tableName
tdTask.ConcurrencyFactor = task.ConcurrencyFactor
tdTask.MaxConnections = task.MaxConnections
tdTask.BlockSize = task.BlockSize
tdTask.CompareUnitSize = task.CompareUnitSize
tdTask.Output = task.Output
Expand Down Expand Up @@ -422,6 +427,7 @@ func (task *RepsetDiffCmd) CloneForSchedule(ctx context.Context) *RepsetDiffCmd
clone.Quiet = task.Quiet
clone.BlockSize = task.BlockSize
clone.ConcurrencyFactor = task.ConcurrencyFactor
clone.MaxConnections = task.MaxConnections
clone.CompareUnitSize = task.CompareUnitSize
clone.Output = task.Output
clone.TableFilter = task.TableFilter
Expand Down
7 changes: 7 additions & 0 deletions internal/consistency/diff/schema_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type SchemaDiffCmd struct {
database types.Database
ConnectionPool *pgxpool.Pool
ConcurrencyFactor float64
MaxConnections int
BlockSize int
CompareUnitSize int
Output string
Expand Down Expand Up @@ -177,6 +178,10 @@ func (c *SchemaDiffCmd) Validate() error {
return fmt.Errorf("schema-diff needs at least two nodes to compare")
}

if c.MaxConnections < 0 {
return fmt.Errorf("max_connections must be >= 1 (or 0 to derive from concurrency factor)")
}

return nil
}

Expand Down Expand Up @@ -523,6 +528,7 @@ func (task *SchemaDiffCmd) SchemaTableDiff() (err error) {
tdTask.Nodes = task.Nodes
tdTask.QualifiedTableName = qualifiedTableName
tdTask.ConcurrencyFactor = task.ConcurrencyFactor
tdTask.MaxConnections = task.MaxConnections
tdTask.BlockSize = task.BlockSize
tdTask.CompareUnitSize = task.CompareUnitSize
tdTask.Output = task.Output
Expand Down Expand Up @@ -577,6 +583,7 @@ func (task *SchemaDiffCmd) CloneForSchedule(ctx context.Context) *SchemaDiffCmd
clone.DDLOnly = task.DDLOnly
clone.BlockSize = task.BlockSize
clone.ConcurrencyFactor = task.ConcurrencyFactor
clone.MaxConnections = task.MaxConnections
clone.CompareUnitSize = task.CompareUnitSize
clone.Output = task.Output
clone.TableFilter = task.TableFilter
Expand Down
Loading
Loading