Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ When adding or modifying features, prefer extending existing packages before cre
- `pkg/manager/config/` - Auto-reloads configuration files and provides interfaces to query them.
- `pkg/manager/elect/` - Manages TiProxy owner elections (for example, metrics reader and VIP modules need an owner).
- `pkg/manager/id/` - Generates global IDs.
- `pkg/manager/backendcluster/` - Manages cluster-scoped backend runtimes and shared resources such as PD or etcd clients.
- `pkg/manager/infosync/` - Queries the topology of TiDB and Prometheus from PD and updates TiProxy information to PD.
- `pkg/manager/logger/` - Manages the logger service.
- `pkg/manager/memory/` - Records heap and goroutine profiles when memory usage is high.
Expand Down
28 changes: 21 additions & 7 deletions pkg/balance/observer/backend_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,46 @@ type BackendFetcher interface {
// TopologyFetcher is an interface to fetch the tidb topology from ETCD.
type TopologyFetcher interface {
GetTiDBTopology(ctx context.Context) (map[string]*infosync.TiDBTopologyInfo, error)
// HasBackendClusters reports whether dynamic PD-backed clusters are configured at all.
// PDFetcher uses it to preserve the legacy behavior that static backend.instances still work
// when TiProxy starts without any PD cluster and clusters are added later through the API.
HasBackendClusters() bool
}

// PDFetcher fetches backend list from PD.
type PDFetcher struct {
tpFetcher TopologyFetcher
logger *zap.Logger
config *config.HealthCheck
static *StaticFetcher
}

func NewPDFetcher(tpFetcher TopologyFetcher, logger *zap.Logger, config *config.HealthCheck) *PDFetcher {
func NewPDFetcher(tpFetcher TopologyFetcher, staticAddrs []string, logger *zap.Logger, config *config.HealthCheck) *PDFetcher {
config.Check()
return &PDFetcher{
tpFetcher: tpFetcher,
logger: logger,
config: config,
static: NewStaticFetcher(staticAddrs),
}
}

func (pf *PDFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
// Keep backward compatibility with the legacy static-namespace flow: before any backend cluster
// is configured, backend.instances must still be routable even though namespace now always sees
// a non-nil topology fetcher from the cluster manager.
if !pf.tpFetcher.HasBackendClusters() {
return pf.static.GetBackendList(ctx)
}
backends := pf.fetchBackendList(ctx)
infos := make(map[string]*BackendInfo, len(backends))
for addr, backend := range backends {
infos[addr] = &BackendInfo{
Labels: backend.Labels,
IP: backend.IP,
StatusPort: backend.StatusPort,
for key, backend := range backends {
infos[key] = &BackendInfo{
Addr: backend.Addr,
ClusterName: backend.ClusterName,
Labels: backend.Labels,
IP: backend.IP,
StatusPort: backend.StatusPort,
}
}
return infos, nil
Expand Down Expand Up @@ -98,7 +112,7 @@ func (sf *StaticFetcher) GetBackendList(context.Context) (map[string]*BackendInf
func backendListToMap(addrs []string) map[string]*BackendInfo {
backends := make(map[string]*BackendInfo, len(addrs))
for _, addr := range addrs {
backends[addr] = &BackendInfo{}
backends[addr] = &BackendInfo{Addr: addr}
}
return backends
}
48 changes: 47 additions & 1 deletion pkg/balance/observer/backend_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestPDFetcher(t *testing.T) {
{
infos: map[string]*infosync.TiDBTopologyInfo{
"1.1.1.1:4000": {
Addr: "1.1.1.1:4000",
Labels: map[string]string{"k1": "v1"},
IP: "1.1.1.1",
StatusPort: 10080,
Expand All @@ -34,6 +35,7 @@ func TestPDFetcher(t *testing.T) {
check: func(m map[string]*BackendInfo) {
require.Len(t, m, 1)
require.NotNil(t, m["1.1.1.1:4000"])
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
require.Equal(t, map[string]string{"k1": "v1"}, m["1.1.1.1:4000"].Labels)
Expand All @@ -42,24 +44,43 @@ func TestPDFetcher(t *testing.T) {
{
infos: map[string]*infosync.TiDBTopologyInfo{
"1.1.1.1:4000": {
Addr: "1.1.1.1:4000",
IP: "1.1.1.1",
StatusPort: 10080,
},
"2.2.2.2:4000": {
Addr: "2.2.2.2:4000",
IP: "2.2.2.2",
StatusPort: 10080,
},
},
check: func(m map[string]*BackendInfo) {
require.Len(t, m, 2)
require.NotNil(t, m["1.1.1.1:4000"])
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
require.NotNil(t, m["2.2.2.2:4000"])
require.Equal(t, "2.2.2.2:4000", m["2.2.2.2:4000"].Addr)
require.Equal(t, "2.2.2.2", m["2.2.2.2:4000"].IP)
require.Equal(t, uint(10080), m["2.2.2.2:4000"].StatusPort)
},
},
{
infos: map[string]*infosync.TiDBTopologyInfo{
"cluster-a/shared.tidb:4000": {
Addr: "shared.tidb:4000",
IP: "10.0.0.1",
StatusPort: 10080,
},
},
check: func(m map[string]*BackendInfo) {
require.Len(t, m, 1)
require.NotNil(t, m["cluster-a/shared.tidb:4000"])
require.Equal(t, "shared.tidb:4000", m["cluster-a/shared.tidb:4000"].Addr)
require.Equal(t, "10.0.0.1", m["cluster-a/shared.tidb:4000"].IP)
},
},
{
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -74,9 +95,10 @@ func TestPDFetcher(t *testing.T) {

tpFetcher := newMockTpFetcher(t)
lg, _ := logger.CreateLoggerForTest(t)
pf := NewPDFetcher(tpFetcher, lg, newHealthCheckConfigForTest())
pf := NewPDFetcher(tpFetcher, nil, lg, newHealthCheckConfigForTest())
for _, test := range tests {
tpFetcher.infos = test.infos
tpFetcher.hasClusters = true
if test.ctx == nil {
test.ctx = context.Background()
}
Expand All @@ -85,3 +107,27 @@ func TestPDFetcher(t *testing.T) {
require.NoError(t, err)
}
}

func TestPDFetcherFallbackToStaticWithoutBackendClusters(t *testing.T) {
tpFetcher := newMockTpFetcher(t)
lg, _ := logger.CreateLoggerForTest(t)
fetcher := NewPDFetcher(tpFetcher, []string{"127.0.0.1:4000"}, lg, newHealthCheckConfigForTest())

backends, err := fetcher.GetBackendList(context.Background())
require.NoError(t, err)
require.Len(t, backends, 1)
require.Contains(t, backends, "127.0.0.1:4000")

tpFetcher.hasClusters = true
tpFetcher.infos = map[string]*infosync.TiDBTopologyInfo{
"cluster-a/10.0.0.1:4000": {
Addr: "10.0.0.1:4000",
ClusterName: "cluster-a",
},
}
backends, err = fetcher.GetBackendList(context.Background())
require.NoError(t, err)
require.Len(t, backends, 1)
require.Equal(t, "10.0.0.1:4000", backends["cluster-a/10.0.0.1:4000"].Addr)
require.Equal(t, "cluster-a", backends["cluster-a/10.0.0.1:4000"].ClusterName)
}
12 changes: 8 additions & 4 deletions pkg/balance/observer/backend_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ func (bh *BackendHealth) String() string {

// BackendInfo stores the status info of each backend.
type BackendInfo struct {
Labels map[string]string
IP string
StatusPort uint
Addr string
ClusterName string
Labels map[string]string
IP string
StatusPort uint
}

func (bi BackendInfo) Equals(other BackendInfo) bool {
return bi.IP == other.IP &&
return bi.Addr == other.Addr &&
bi.ClusterName == other.ClusterName &&
bi.IP == other.IP &&
bi.StatusPort == other.StatusPort &&
maps.Equal(bi.Labels, other.Labels)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/balance/observer/backend_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestBackendHealthToString(t *testing.T) {
{},
{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
Labels: map[string]string{"k1": "v1", "k2": "v2"},
Expand Down Expand Up @@ -45,13 +46,15 @@ func TestBackendHealthEquals(t *testing.T) {
{
a: BackendHealth{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
Labels: map[string]string{"k1": "v1", "k2": "v2"},
},
},
b: BackendHealth{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
},
Expand All @@ -61,13 +64,15 @@ func TestBackendHealthEquals(t *testing.T) {
{
a: BackendHealth{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
Labels: map[string]string{"k1": "v1", "k2": "v2"},
},
},
b: BackendHealth{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
Labels: map[string]string{"k1": "v1", "k2": "v2"},
Expand All @@ -78,6 +83,7 @@ func TestBackendHealthEquals(t *testing.T) {
{
a: BackendHealth{
BackendInfo: BackendInfo{
Addr: "127.0.0.1:4000",
IP: "127.0.0.1",
StatusPort: 1,
Labels: map[string]string{"k1": "v1", "k2": "v2"},
Expand Down
1 change: 1 addition & 0 deletions pkg/balance/observer/backend_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (ts *observerTestSuite) addBackend() (string, BackendInfo) {
ts.backendIdx++
addr := fmt.Sprintf("%d", ts.backendIdx)
info := &BackendInfo{
Addr: addr,
IP: "127.0.0.1",
StatusPort: uint(ts.backendIdx),
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/balance/observer/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
type HealthCheck interface {
Check(ctx context.Context, addr string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
Check(ctx context.Context, backendID string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
}

const (
Expand Down Expand Up @@ -62,7 +62,7 @@ func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger
}
}

func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
func (dhc *DefaultHealthCheck) Check(ctx context.Context, _ string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
bh := &BackendHealth{
BackendInfo: *info,
Healthy: true,
Expand All @@ -80,16 +80,22 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac
if !bh.Healthy {
return bh
}
dhc.checkSqlPort(ctx, addr, bh)
dhc.checkSqlPort(ctx, info, bh)
if !bh.Healthy {
return bh
}
dhc.queryConfig(ctx, info, bh, lastBh)
return bh
}

func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh *BackendHealth) {
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendInfo, bh *BackendHealth) {
// Also dial the SQL port just in case that the SQL port hangs.
if info == nil || info.Addr == "" {
bh.Healthy = false
bh.PingErr = errors.New("backend address is empty")
return
}
addr := info.Addr
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
err := http.ConnectWithRetry(func() error {
startTime := time.Now()
Expand Down
1 change: 1 addition & 0 deletions pkg/balance/observer/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
backend.setSqlResp(true)
backend.startSQLServer()
return backend, &BackendInfo{
Addr: backend.sqlAddr,
IP: backend.ip,
StatusPort: backend.statusPort,
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/balance/observer/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
)

type mockTpFetcher struct {
t *testing.T
infos map[string]*infosync.TiDBTopologyInfo
err error
t *testing.T
infos map[string]*infosync.TiDBTopologyInfo
err error
hasClusters bool
}

func newMockTpFetcher(t *testing.T) *mockTpFetcher {
Expand All @@ -34,6 +35,10 @@ func (ft *mockTpFetcher) GetTiDBTopology(ctx context.Context) (map[string]*infos
return ft.infos, ft.err
}

func (ft *mockTpFetcher) HasBackendClusters() bool {
return ft.hasClusters
}

type mockBackendFetcher struct {
sync.Mutex
backends map[string]*BackendInfo
Expand Down Expand Up @@ -82,11 +87,11 @@ func newMockHealthCheck() *mockHealthCheck {
}
}

func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
func (mhc *mockHealthCheck) Check(_ context.Context, backendID string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
mhc.Lock()
defer mhc.Unlock()
mhc.backends[addr].BackendInfo = *info
return mhc.backends[addr]
mhc.backends[backendID].BackendInfo = *info
return mhc.backends[backendID]
}

func (mhc *mockHealthCheck) setBackend(addr string, health *BackendHealth) {
Expand Down
Loading