Commit 6f2f266e authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

transaction

parent ea225fe0
...@@ -11,6 +11,7 @@ require ( ...@@ -11,6 +11,7 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.5 github.com/jackc/pgx/v5 v5.7.5
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
golang.org/x/sync v0.16.0 golang.org/x/sync v0.16.0
) )
...@@ -40,11 +41,11 @@ require ( ...@@ -40,11 +41,11 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect github.com/ugorji/go/codec v1.3.0 // indirect
go.uber.org/atomic v1.11.0 // indirect go.uber.org/atomic v1.11.0 // indirect
golang.org/x/arch v0.19.0 // indirect golang.org/x/arch v0.20.0 // indirect
golang.org/x/crypto v0.40.0 // indirect golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.42.0 // indirect golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.34.0 // indirect golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.27.0 // indirect golang.org/x/text v0.28.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect google.golang.org/protobuf v1.36.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
...@@ -109,23 +109,23 @@ github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2W ...@@ -109,23 +109,23 @@ github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2W
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/arch v0.19.0 h1:LmbDQUodHThXE+htjrnmVD73M//D9GTH6wFZjyDkjyU= golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.19.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4=
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
......
...@@ -4,7 +4,7 @@ func (d *Pool) WExec(sql string, args ...any) error { ...@@ -4,7 +4,7 @@ func (d *Pool) WExec(sql string, args ...any) error {
return d.qExec(d.SrvMaster, sql, args...) return d.qExec(d.SrvMaster, sql, args...)
} }
func (d *Pool) WExecQty(sql string, args ...any) (int64, error) { func (d *Pool) WExecQty(sql string, args ...any) (int, error) {
return d.qExecQty(d.SrvMaster, sql, args...) return d.qExecQty(d.SrvMaster, sql, args...)
} }
...@@ -14,7 +14,7 @@ func (d *Pool) WExecNamed(sql string, args map[string]any) error { ...@@ -14,7 +14,7 @@ func (d *Pool) WExecNamed(sql string, args map[string]any) error {
return d.qExec(d.SrvMaster, newSql, newArgs...) return d.qExec(d.SrvMaster, newSql, newArgs...)
} }
func (d *Pool) WExecNamedQty(sql string, args map[string]any) (int64, error) { func (d *Pool) WExecNamedQty(sql string, args map[string]any) (int, error) {
newSql, newArgs := d.prepare(sql, args) newSql, newArgs := d.prepare(sql, args)
return d.qExecQty(d.SrvMaster, newSql, newArgs...) return d.qExecQty(d.SrvMaster, newSql, newArgs...)
...@@ -27,7 +27,7 @@ func (d *Pool) WExecOpts(opts Opts) error { ...@@ -27,7 +27,7 @@ func (d *Pool) WExecOpts(opts Opts) error {
return d.qExec(d.SrvMaster, newSql, newArgs...) return d.qExec(d.SrvMaster, newSql, newArgs...)
} }
func (d *Pool) WExecOptsQty(opts Opts) (int64, error) { func (d *Pool) WExecOptsQty(opts Opts) (int, error) {
sql, args := opts.Opts() sql, args := opts.Opts()
newSql, newArgs := d.prepare(sql, args) newSql, newArgs := d.prepare(sql, args)
...@@ -40,11 +40,11 @@ func (d *Pool) qExec(q *Conn, sql string, args ...any) error { ...@@ -40,11 +40,11 @@ func (d *Pool) qExec(q *Conn, sql string, args ...any) error {
return err return err
} }
func (d *Pool) qExecQty(q *Conn, sql string, args ...any) (int64, error) { func (d *Pool) qExecQty(q *Conn, sql string, args ...any) (int, error) {
s, err := q.Exec(d.ctx, sql, args...) s, err := q.Exec(d.ctx, sql, args...)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return s.RowsAffected(), nil return int(s.RowsAffected()), nil
} }
...@@ -12,21 +12,12 @@ import ( ...@@ -12,21 +12,12 @@ import (
"time" "time"
) )
type Balance int
const (
BalanceRoundRobin Balance = iota
BalanceLeastConn
)
type Pool struct { type Pool struct {
ctx context.Context
logger Logger
SrvMaster *Conn SrvMaster *Conn
SrvSync []*Conn SrvSync []*Conn
SrvAsync []*Conn SrvAsync []*Conn
mu *sync.RWMutex
notAliveConns []*Conn notAliveConns []*Conn
slavesIter *atomic.Int64 slavesIter *atomic.Int64
slavesAsyncIter *atomic.Int64 slavesAsyncIter *atomic.Int64
stop bool stop bool
...@@ -34,10 +25,13 @@ type Pool struct { ...@@ -34,10 +25,13 @@ type Pool struct {
ContinuesTry []string ContinuesTry []string
TryOnError int TryOnError int
TryOnSleep time.Duration TryOnSleep time.Duration
Balance Balance PingTimeout time.Duration
PingTimout time.Duration
PingTry int PingTry int
Debug bool Debug bool
ctx context.Context
logger Logger
mu *sync.RWMutex
} }
func (d *Pool) WithContext(ctx context.Context) *Pool { func (d *Pool) WithContext(ctx context.Context) *Pool {
...@@ -56,8 +50,7 @@ func (d *Pool) WithContext(ctx context.Context) *Pool { ...@@ -56,8 +50,7 @@ func (d *Pool) WithContext(ctx context.Context) *Pool {
ContinuesTry: d.ContinuesTry, ContinuesTry: d.ContinuesTry,
TryOnError: d.TryOnError, TryOnError: d.TryOnError,
TryOnSleep: d.TryOnSleep, TryOnSleep: d.TryOnSleep,
Balance: d.Balance, PingTimeout: d.PingTimeout,
PingTimout: d.PingTimout,
PingTry: d.PingTry, PingTry: d.PingTry,
} }
} }
...@@ -184,13 +177,6 @@ func (d *Pool) sync() *Conn { ...@@ -184,13 +177,6 @@ func (d *Pool) sync() *Conn {
return d.SrvSync[0] return d.SrvSync[0]
} }
if d.Balance == BalanceRoundRobin {
conn := d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
logConnStat(conn)
return conn
}
return d.least(d.SrvSync) return d.least(d.SrvSync)
} }
...@@ -206,13 +192,6 @@ func (d *Pool) async() *Conn { ...@@ -206,13 +192,6 @@ func (d *Pool) async() *Conn {
return d.SrvAsync[0] return d.SrvAsync[0]
} }
if d.Balance == BalanceRoundRobin {
conn := d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
logConnStat(conn)
return conn
}
return d.least(d.SrvAsync) return d.least(d.SrvAsync)
} }
...@@ -259,7 +238,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) ...@@ -259,7 +238,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any)
func (d *Pool) Ping(q *Conn) (err error) { func (d *Pool) Ping(q *Conn) (err error) {
var n any var n any
if err = d.WithTimeout(d.PingTimout).qGet(q, &n, "SELECT 1"); err != nil { if err = d.WithTimeout(d.PingTimeout).qGet(q, &n, "SELECT 1"); err != nil {
q.PingTry++ q.PingTry++
d.logger.Printf("ZDB_PING_ERR: SRV: %s; TRY: %d; %v", d.logger.Printf("ZDB_PING_ERR: SRV: %s; TRY: %d; %v",
......
//go:build !zgo_debug //go:build !zdb_debug
package zdb package zdb
......
package zdb
import (
"context"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgx/v5"
"github.com/pkg/errors"
)
type Tx struct {
tx pgx.Tx
ctx context.Context
pool *Pool
}
func (d *Pool) Tx() (*Tx, error) {
return d.TxNew(d.ctx)
}
func (d *Pool) TxNew(ctx context.Context) (*Tx, error) {
pgTx, err := d.SrvMaster.Begin(ctx)
return &Tx{tx: pgTx, ctx: ctx, pool: d}, err
}
func (d *Pool) MustTx() *Tx {
tx, _ := d.TxNew(d.ctx)
return tx
}
func (t *Tx) Rollback() error {
return t.tx.Rollback(t.ctx)
}
func (t *Tx) Commit() error {
return t.tx.Commit(t.ctx)
}
func (t *Tx) Invoke(fn func(*Tx) error) error {
if err := fn(t); err != nil {
if err1 := t.Rollback(); err1 != nil {
return errors.Wrap(err1, err.Error())
}
return err
}
return t.Commit()
}
func (t *Tx) Get(dst any, sql string, args ...any) error {
return pgxscan.Get(t.ctx, t.tx, dst, sql, args...)
}
func (t *Tx) GetNamed(dst any, sql string, args map[string]any) error {
newSql, newArgs := t.pool.prepare(sql, args)
return t.Get(dst, newSql, newArgs...)
}
func (t *Tx) GetOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
newSql, newArgs := t.pool.prepare(sql, args)
return t.Get(dst, newSql, newArgs...)
}
func (t *Tx) Select(dst any, sql string, args ...any) error {
return pgxscan.Select(t.ctx, t.tx, dst, sql, args...)
}
func (t *Tx) SelectNamed(dst any, sql string, args map[string]any) error {
newSql, newArgs := t.pool.prepare(sql, args)
return t.Select(dst, newSql, newArgs...)
}
func (t *Tx) SelectOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
newSql, newArgs := t.pool.prepare(sql, args)
return t.Select(dst, newSql, newArgs...)
}
func (t *Tx) Exec(sql string, args ...any) error {
_, err := t.ExecQty(sql, args...)
return err
}
func (t *Tx) ExecNamed(sql string, args map[string]any) error {
_, err := t.ExecNamedQty(sql, args)
return err
}
func (t *Tx) ExecOpts(opts Opts) error {
_, err := t.ExecOptsQty(opts)
return err
}
func (t *Tx) ExecQty(sql string, args ...any) (int, error) {
s, err := t.tx.Exec(t.ctx, sql, args...)
if err != nil {
return 0, err
}
return int(s.RowsAffected()), nil
}
func (t *Tx) ExecNamedQty(sql string, args map[string]any) (int, error) {
newSql, newArgs := t.pool.prepare(sql, args)
return t.ExecQty(newSql, newArgs)
}
func (t *Tx) ExecOptsQty(opts Opts) (int, error) {
sql, args := opts.Opts()
newSql, newArgs := t.pool.prepare(sql, args)
return t.ExecQty(newSql, newArgs)
}
...@@ -18,8 +18,7 @@ func New(ctx context.Context) *Pool { ...@@ -18,8 +18,7 @@ func New(ctx context.Context) *Pool {
ContinuesTry: []string{"conflict with recovery"}, ContinuesTry: []string{"conflict with recovery"},
TryOnError: 1, TryOnError: 1,
TryOnSleep: time.Second, TryOnSleep: time.Second,
Balance: BalanceLeastConn, PingTimeout: time.Second * 5,
PingTimout: time.Second * 5,
PingTry: 5, PingTry: 5,
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment