Commit 5afc3dc5 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

save

parent bf385ebd
Pipeline #40 canceled with stages
...@@ -29,8 +29,20 @@ type Pool struct { ...@@ -29,8 +29,20 @@ type Pool struct {
slavesAsyncIter *atomic.Int64 slavesAsyncIter *atomic.Int64
} }
func New() *Pool {
return &Pool{
mu: &sync.RWMutex{},
slavesIter: &atomic.Int64{},
slavesAsyncIter: &atomic.Int64{},
}
}
func NewDefault() *Pool { func NewDefault() *Pool {
return &Pool{ctx: context.Background(), logger: log.Default()} p := New()
p.ctx = context.Background()
p.logger = log.Default()
return p
} }
func (d *Pool) WithContext(ctx context.Context) *Pool { func (d *Pool) WithContext(ctx context.Context) *Pool {
...@@ -47,22 +59,23 @@ func (d *Pool) WithContext(ctx context.Context) *Pool { ...@@ -47,22 +59,23 @@ func (d *Pool) WithContext(ctx context.Context) *Pool {
} }
func (d *Pool) NewConn(mode connMode, pgConnString string) error { func (d *Pool) NewConn(mode connMode, pgConnString string) error {
if c, err := d.newConn(mode, pgConnString); err != nil { d.mu.Lock()
return err defer d.mu.Unlock()
} else {
switch mode {
case ConnModeMaster:
d.srvMaster = c
case ConnModeSync:
d.srvSlaves = append(d.srvSlaves, c)
case ConnModeAsync:
d.srvSlavesAsync = append(d.srvSlavesAsync, c)
default:
return errors.New("unknown mode")
}
return nil q, err := d.newConn(mode, pgConnString)
switch mode {
case ConnModeMaster:
d.srvMaster = q
case ConnModeSync:
d.srvSlaves = append(d.srvSlaves, q)
case ConnModeAsync:
d.srvSlavesAsync = append(d.srvSlavesAsync, q)
default:
return errors.New("unknown mode")
} }
return err
} }
func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error) { func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error) {
var pgxPool *pgxpool.Pool var pgxPool *pgxpool.Pool
...@@ -77,7 +90,7 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error) ...@@ -77,7 +90,7 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error)
q = &conn{Pool: pgxPool, Alive: false, Mode: mode} q = &conn{Pool: pgxPool, Alive: false, Mode: mode}
if err = d.testConn(q); err != nil { if err = d.ping(q); err != nil {
return q, err return q, err
} }
...@@ -136,11 +149,11 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *conn, dst1 any) ...@@ -136,11 +149,11 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *conn, dst1 any)
} }
} }
func (d *Pool) testConn(q *conn) (err error) { func (d *Pool) ping(q *conn) (err error) {
var n any var n any
if err = d.qGet(q, &n, "SELECT 1"); err != nil { if err = d.qGet(q, &n, "SELECT 1"); err != nil {
d.logger.Printf("DB_TEST_CONN_ERR: MODE: %v, HOST: %s, PORT: %d, ERR: %v", d.logger.Printf("DB_PING_ERR: MODE: %v, HOST: %s, PORT: %d, ERR: %v",
q.Mode.String(), q.Mode.String(),
q.Config().ConnConfig.Host, q.Config().ConnConfig.Host,
q.Config().ConnConfig.Port, q.Config().ConnConfig.Port,
...@@ -179,7 +192,7 @@ func (d *Pool) Test() { ...@@ -179,7 +192,7 @@ func (d *Pool) Test() {
go func() { go func() {
for { for {
for i, q := range d.notAliveConns { for i, q := range d.notAliveConns {
if err := d.testConn(q); err == nil { if err := d.ping(q); err == nil {
d.mu.Lock() d.mu.Lock()
q.Alive = true q.Alive = true
d.notAliveConns = remove(d.notAliveConns, i) d.notAliveConns = remove(d.notAliveConns, i)
...@@ -192,7 +205,7 @@ func (d *Pool) Test() { ...@@ -192,7 +205,7 @@ func (d *Pool) Test() {
} }
} }
d.srvMaster.Alive = d.testConn(d.srvMaster) == nil d.srvMaster.Alive = d.ping(d.srvMaster) == nil
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment