Commit a91ab05c authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

timeout

parent 53004512
......@@ -10,7 +10,7 @@ import (
)
func (d *Pool) WAny(dst any, sql string, args ...any) error {
return d.qAny(d.srvMaster, dst, sql, args...)
return d.qAny(d.SrvMaster, dst, sql, args...)
}
func (d *Pool) Any(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *Conn, dst1 any) error {
......
package zdb
func (d *Pool) WExec(sql string, args ...any) error {
return d.qExec(d.srvMaster, sql, args...)
return d.qExec(d.SrvMaster, sql, args...)
}
func (d *Pool) WExecNamed(sql string, args map[string]any) error {
return d.qExec(d.srvMaster, d.prepare(sql, args))
return d.qExec(d.SrvMaster, d.prepare(sql, args))
}
func (d *Pool) WExecOpts(opts Opts) error {
sql, args := opts.Opts()
return d.qExec(d.srvMaster, d.prepare(sql, args))
return d.qExec(d.SrvMaster, d.prepare(sql, args))
}
func (d *Pool) qExec(q *Conn, sql string, args ...any) error {
......
......@@ -3,15 +3,15 @@ package zdb
import "github.com/georgysavva/scany/v2/pgxscan"
func (d *Pool) WGet(dst any, sql string, args ...any) error {
return d.qGet(d.srvMaster, dst, sql, args...)
return d.qGet(d.SrvMaster, dst, sql, args...)
}
func (d *Pool) WGetNamed(dst any, sql string, args map[string]any) error {
return d.qGet(d.srvMaster, dst, d.prepare(sql, args))
return d.qGet(d.SrvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) WGetOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.qGet(d.srvMaster, dst, d.prepare(sql, args))
return d.qGet(d.SrvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) Get(dst any, sql string, args ...any) error {
......
......@@ -5,14 +5,14 @@ go 1.20
require (
github.com/georgysavva/scany/v2 v2.0.0
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v5 v5.3.1
github.com/jackc/pgx/v5 v5.4.2
)
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/text v0.9.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect
)
......@@ -10,10 +10,10 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg=
github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
......@@ -23,13 +23,13 @@ github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
......@@ -21,33 +21,27 @@ const (
)
type Pool struct {
ctx context.Context
logger Logger
srvMaster *Conn
srvSync []*Conn
srvAsync []*Conn
mu *sync.RWMutex
notAliveConns []*Conn
ctx context.Context
logger Logger
SrvMaster *Conn
SrvSync []*Conn
SrvAsync []*Conn
mu *sync.RWMutex
notAliveConns []*Conn
slavesIter *atomic.Int64
slavesAsyncIter *atomic.Int64
stop bool
PgTsFormat string
Continues []string
ContinuesTry []string
TryOnError int
TryOnSleep time.Duration
Balance Balance
stop bool
PgTsFormat string
Continues []string
ContinuesTry []string
TryOnError int
TryOnSleep time.Duration
Balance Balance
}
func New() *Pool {
func New(ctx context.Context) *Pool {
return &Pool{
ctx: ctx,
mu: &sync.RWMutex{},
slavesIter: &atomic.Int64{},
slavesAsyncIter: &atomic.Int64{},
......@@ -61,8 +55,7 @@ func New() *Pool {
}
func NewDefault() *Pool {
p := New()
p.ctx = context.Background()
p := New(context.Background())
p.logger = log.Default()
return p
......@@ -71,16 +64,38 @@ func NewDefault() *Pool {
func (d *Pool) WithContext(ctx context.Context) *Pool {
return &Pool{
ctx: ctx,
srvMaster: d.srvMaster,
srvSync: d.srvSync,
srvAsync: d.srvAsync,
logger: d.logger,
SrvMaster: d.SrvMaster,
SrvSync: d.SrvSync,
SrvAsync: d.SrvAsync,
mu: d.mu,
notAliveConns: d.notAliveConns,
slavesIter: d.slavesIter,
slavesAsyncIter: d.slavesAsyncIter,
stop: d.stop,
PgTsFormat: d.PgTsFormat,
Continues: d.Continues,
ContinuesTry: d.ContinuesTry,
TryOnError: d.TryOnError,
TryOnSleep: d.TryOnSleep,
Balance: d.Balance,
}
}
func (d *Pool) WithTimeout(dur time.Duration) *Pool {
ctx, cancel := context.WithTimeout(d.ctx, dur)
defer cancel()
return d.WithContext(ctx)
}
func (d *Pool) WithDeadline(dur time.Time) *Pool {
ctx, cancel := context.WithDeadline(d.ctx, dur)
defer cancel()
return d.WithContext(ctx)
}
func (d *Pool) NewConn(mode connMode, pgConnString string) error {
d.mu.Lock()
defer d.mu.Unlock()
......@@ -89,11 +104,11 @@ func (d *Pool) NewConn(mode connMode, pgConnString string) error {
switch mode {
case ConnModeMaster:
d.srvMaster = q
d.SrvMaster = q
case ConnModeSync:
d.srvSync = append(d.srvSync, q)
d.SrvSync = append(d.SrvSync, q)
case ConnModeAsync:
d.srvAsync = append(d.srvAsync, q)
d.SrvAsync = append(d.SrvAsync, q)
default:
return errors.New("unknown mode")
}
......@@ -119,6 +134,10 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error)
pgConnString += " default_query_exec_mode=simple_protocol"
}
if !strings.Contains(pgConnString, "connect_timeout=") {
pgConnString += " connect_timeout=3"
}
if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
return nil, err
}
......@@ -139,25 +158,25 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error)
}
func (d *Pool) sync() *Conn {
if len(d.srvSync) == 0 {
return d.srvMaster
if len(d.SrvSync) == 0 {
return d.SrvMaster
}
d.mu.RLock()
defer d.mu.RUnlock()
if len(d.srvSync) == 1 {
return d.srvSync[0]
if len(d.SrvSync) == 1 {
return d.SrvSync[0]
}
if d.Balance == BalanceRoundRobin {
return d.srvSync[d.slavesIter.Add(1)%int64(len(d.srvSync))]
return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
}
var out *Conn
var min int32 = 1 << 30
for _, conn := range d.srvSync {
for _, conn := range d.SrvSync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
......@@ -168,25 +187,25 @@ func (d *Pool) sync() *Conn {
}
func (d *Pool) async() *Conn {
if len(d.srvAsync) == 0 {
if len(d.SrvAsync) == 0 {
return d.sync()
}
d.mu.RLock()
defer d.mu.RUnlock()
if len(d.srvAsync) == 1 {
return d.srvAsync[0]
if len(d.SrvAsync) == 1 {
return d.SrvAsync[0]
}
if d.Balance == BalanceRoundRobin {
return d.srvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.srvAsync))]
return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
}
var out *Conn
var min int32 = 1 << 30
for _, conn := range d.srvAsync {
for _, conn := range d.SrvAsync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
......@@ -252,20 +271,20 @@ func (d *Pool) setNotAliveConn(conn *Conn) {
d.mu.Lock()
defer d.mu.Unlock()
for i, slave := range d.srvSync {
for i, slave := range d.SrvSync {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvSync = remove(d.srvSync, i)
d.SrvSync = remove(d.SrvSync, i)
return
}
}
for i, slave := range d.srvAsync {
for i, slave := range d.SrvAsync {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvAsync = remove(d.srvAsync, i)
d.SrvAsync = remove(d.SrvAsync, i)
return
}
......@@ -287,16 +306,16 @@ func (d *Pool) Start() {
q.Alive = true
d.notAliveConns = remove(d.notAliveConns, i)
if q.Mode == ConnModeSync {
d.srvSync = append(d.srvSync, q)
d.SrvSync = append(d.SrvSync, q)
} else if q.Mode == ConnModeAsync {
d.srvAsync = append(d.srvAsync, q)
d.SrvAsync = append(d.SrvAsync, q)
}
d.mu.Unlock()
}
}
if d.srvMaster != nil {
d.srvMaster.Alive = d.ping(d.srvMaster) == nil
if d.SrvMaster != nil {
d.SrvMaster.Alive = d.ping(d.SrvMaster) == nil
}
time.Sleep(time.Second)
......@@ -309,7 +328,7 @@ func (d *Pool) Stop() {
}
func (d *Pool) IsAlive() bool {
return d.srvMaster != nil && d.srvMaster.Alive
return d.SrvMaster != nil && d.SrvMaster.Alive
}
func (d *Pool) prepare(sql string, param map[string]any) string {
......
......@@ -5,15 +5,15 @@ import (
)
func (d *Pool) WSelect(dst any, sql string, args ...any) error {
return d.qSelect(d.srvMaster, dst, sql, args...)
return d.qSelect(d.SrvMaster, dst, sql, args...)
}
func (d *Pool) WSelectNamed(dst any, sql string, args map[string]any) error {
return d.qSelect(d.srvMaster, dst, d.prepare(sql, args))
return d.qSelect(d.SrvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) WSelectOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.qSelect(d.srvMaster, dst, d.prepare(sql, args))
return d.qSelect(d.SrvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) Select(dst any, sql string, args ...any) error {
......
......@@ -25,17 +25,17 @@ type Stat struct {
}
func (d *Pool) StatMaster() *Stat {
return d.stat(d.srvMaster)
return d.Stat(d.SrvMaster)
}
func (d *Pool) StatPool() []*Stat {
out := make([]*Stat, 0)
for _, q := range append(d.srvSync, d.srvAsync...) {
out = append(out, d.stat(q))
for _, q := range append(d.SrvSync, d.SrvAsync...) {
out = append(out, d.Stat(q))
}
if d.srvMaster != nil {
return append(out, d.stat(d.srvMaster))
if d.SrvMaster != nil {
return append(out, d.Stat(d.SrvMaster))
}
return out
......@@ -62,7 +62,7 @@ func (d *Pool) StatPoolTotal() *Stat {
return m
}
func (d *Pool) stat(q *Conn) *Stat {
func (d *Pool) Stat(q *Conn) *Stat {
s := q.Stat()
return &Stat{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment