Commit 4d88c637 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

add ping try & fix least

parent f3979db5
...@@ -8,8 +8,10 @@ import ( ...@@ -8,8 +8,10 @@ import (
type Conn struct { type Conn struct {
*pgxpool.Pool *pgxpool.Pool
Index int
Mode connMode Mode connMode
Alive bool Alive bool
PingTry int
} }
type connMode int type connMode int
......
//go:build dev
package zdb
import "log"
func logConnStat(conn *Conn) {
log.Printf("\033[9%dm%s\u001B[0m -> idle: %d; acquired: %d, total: %d, max: %d, ratio: %0.2f",
conn.Index+1,
conn.ToString(),
conn.Stat().IdleConns(),
conn.Stat().AcquiredConns(),
conn.Stat().TotalConns(),
conn.Stat().MaxConns(),
float64(conn.Stat().AcquiredConns())/float64(conn.Stat().MaxConns()),
)
}
...@@ -5,14 +5,14 @@ go 1.20 ...@@ -5,14 +5,14 @@ go 1.20
require ( require (
github.com/georgysavva/scany/v2 v2.0.0 github.com/georgysavva/scany/v2 v2.0.0
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/jackc/pgx/v5 v5.4.2 github.com/jackc/pgx/v5 v5.4.3
) )
require ( require (
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.11.0 // indirect golang.org/x/crypto v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect
) )
...@@ -10,8 +10,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI ...@@ -10,8 +10,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg= github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
...@@ -23,13 +23,13 @@ github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= ...@@ -23,13 +23,13 @@ github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
...@@ -38,6 +38,7 @@ type Pool struct { ...@@ -38,6 +38,7 @@ type Pool struct {
TryOnSleep time.Duration TryOnSleep time.Duration
Balance Balance Balance Balance
PingTimout time.Duration PingTimout time.Duration
PingTry int
} }
func New(ctx context.Context) *Pool { func New(ctx context.Context) *Pool {
...@@ -53,6 +54,7 @@ func New(ctx context.Context) *Pool { ...@@ -53,6 +54,7 @@ func New(ctx context.Context) *Pool {
TryOnSleep: time.Second, TryOnSleep: time.Second,
Balance: BalanceLeastConn, Balance: BalanceLeastConn,
PingTimout: time.Second * 5, PingTimout: time.Second * 5,
PingTry: 5,
} }
} }
...@@ -82,6 +84,7 @@ func (d *Pool) WithContext(ctx context.Context) *Pool { ...@@ -82,6 +84,7 @@ func (d *Pool) WithContext(ctx context.Context) *Pool {
TryOnSleep: d.TryOnSleep, TryOnSleep: d.TryOnSleep,
Balance: d.Balance, Balance: d.Balance,
PingTimout: d.PingTimout, PingTimout: d.PingTimout,
PingTry: d.PingTry,
} }
} }
...@@ -107,8 +110,10 @@ func (d *Pool) NewConn(mode connMode, pgConnString string) error { ...@@ -107,8 +110,10 @@ func (d *Pool) NewConn(mode connMode, pgConnString string) error {
case ConnModeMaster: case ConnModeMaster:
d.SrvMaster = q d.SrvMaster = q
case ConnModeSync: case ConnModeSync:
q.Index = len(d.SrvSync)
d.SrvSync = append(d.SrvSync, q) d.SrvSync = append(d.SrvSync, q)
case ConnModeAsync: case ConnModeAsync:
q.Index = len(d.SrvAsync)
d.SrvAsync = append(d.SrvAsync, q) d.SrvAsync = append(d.SrvAsync, q)
default: default:
return errors.New("unknown mode") return errors.New("unknown mode")
...@@ -162,6 +167,24 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) ...@@ -162,6 +167,24 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error)
return q, nil return q, nil
} }
func (d *Pool) least(s []*Conn) *Conn {
var out *Conn
var m float64 = 0
for i, conn := range s {
ratio := float64(conn.Stat().AcquiredConns()) / float64(conn.Stat().MaxConns())
if ratio < m || i == 0 {
m = ratio
out = conn
}
}
logConnStat(out)
return out
}
func (d *Pool) sync() *Conn { func (d *Pool) sync() *Conn {
if len(d.SrvSync) == 0 { if len(d.SrvSync) == 0 {
return d.SrvMaster return d.SrvMaster
...@@ -178,17 +201,7 @@ func (d *Pool) sync() *Conn { ...@@ -178,17 +201,7 @@ func (d *Pool) sync() *Conn {
return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))] return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
} }
var out *Conn return d.least(d.SrvSync)
var min int32 = 1 << 30
for _, conn := range d.SrvSync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
}
}
return out
} }
func (d *Pool) async() *Conn { func (d *Pool) async() *Conn {
...@@ -207,17 +220,7 @@ func (d *Pool) async() *Conn { ...@@ -207,17 +220,7 @@ func (d *Pool) async() *Conn {
return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))] return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
} }
var out *Conn return d.least(d.SrvAsync)
var min int32 = 1 << 30
for _, conn := range d.SrvAsync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
}
}
return out
} }
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error { func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
...@@ -239,7 +242,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) ...@@ -239,7 +242,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any)
if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) { if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
try++ try++
d.logger.Printf("DB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error()) d.logger.Printf("ZDB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())
time.Sleep(d.TryOnSleep) time.Sleep(d.TryOnSleep)
goto repeat goto repeat
...@@ -247,7 +250,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) ...@@ -247,7 +250,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any)
if contains(err.Error(), d.Continues) { if contains(err.Error(), d.Continues) {
d.setNotAliveConn(q) d.setNotAliveConn(q)
d.logger.Printf("DB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error()) d.logger.Printf("ZDB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
continue continue
} else { } else {
return err return err
...@@ -263,12 +266,21 @@ func (d *Pool) Ping(q *Conn) (err error) { ...@@ -263,12 +266,21 @@ func (d *Pool) Ping(q *Conn) (err error) {
var n any var n any
if err = d.WithTimeout(d.PingTimout).qGet(q, &n, "SELECT 1"); err != nil { if err = d.WithTimeout(d.PingTimout).qGet(q, &n, "SELECT 1"); err != nil {
d.logger.Printf("DB_PING_ERR: SRV: %s; %v", q.PingTry++
d.logger.Printf("ZDB_PING_ERR: SRV: %s; TRY: %d; %v",
q.ToString(), q.ToString(),
q.PingTry,
err, err,
) )
if d.PingTry <= q.PingTry {
return nil
}
} }
q.PingTry = 0
return return
} }
......
//go:build !dev
package zdb
func logConnStat(conn *Conn) {
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment