Commit f6f60155 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

init

parents
Pipeline #37 canceled with stages
* text=auto eol=lf
*.go text eol=lf
.idea
main/
\ No newline at end of file
package zdb
import (
"github.com/jackc/pgx/v5/pgxpool"
)
type conn struct {
*pgxpool.Pool
Mode connMode
Alive bool
}
type connMode int
const (
ConnModeMaster connMode = iota
ConnModeSync
ConnModeAsync
)
func (c connMode) String() string {
switch c {
case ConnModeMaster:
return "master"
case ConnModeSync:
return "sync"
case ConnModeAsync:
return "async"
default:
return "unknown"
}
}
package zdb
func (d *Pool) QExec(q *conn, sql string, args ...any) error {
_, err := q.Exec(d.ctx, sql, args...)
return err
}
func (d *Pool) WExec(sql string, args ...any) error {
return d.QExec(d.srvMaster, sql, args...)
}
func (d *Pool) WExecNamed(sql string, args map[string]any) error {
return d.QExec(d.srvMaster, d.prepare(sql, args))
}
package zdb
import "github.com/georgysavva/scany/v2/pgxscan"
func (d *Pool) QGet(q pgxscan.Querier, dst any, sql string, args ...any) error {
return pgxscan.Get(d.ctx, q, dst, sql, args...)
}
func (d *Pool) Get(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.QGet(conn, dst1, sql, args...)
})
}
func (d *Pool) GetNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.QGet(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) GetOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.QGet(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) GetAsync(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.QGet(conn, dst1, sql, args...)
})
}
func (d *Pool) GetAsyncNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.QGet(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) WGet(dst any, sql string, args ...any) error {
return d.QGet(d.srvMaster, dst, sql, args...)
}
func (d *Pool) WGetNamed(dst any, sql string, args map[string]any) error {
return d.QGet(d.srvMaster, dst, d.prepare(sql, args))
}
module gitlab.x/barsukov/zdb
go 1.20
require (
github.com/georgysavva/scany/v2 v2.0.0
github.com/jackc/pgx/v5 v5.3.1
)
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/text v0.9.0 // indirect
)
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/georgysavva/scany/v2 v2.0.0 h1:RGXqxDv4row7/FYoK8MRXAZXqoWF/NM+NP0q50k3DKU=
github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7K/Zhyz8vWo38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package zdb
import "github.com/jackc/pgx/v5"
var ErrNoRows = pgx.ErrNoRows
type Opts interface {
Opts() (sql string, args map[string]any)
}
func remove[T comparable](slice []T, s int) []T {
return append(slice[:s], slice[s+1:]...)
}
type Logger interface {
Printf(format string, v ...any)
}
package zdb
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"log"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
)
type Pool struct {
ctx context.Context
logger Logger
srvMaster *conn
srvSlaves []*conn
srvSlavesAsync []*conn
mu *sync.RWMutex
notAliveConns []*conn
slavesIter *atomic.Int64
slavesAsyncIter *atomic.Int64
}
func NewDefault() *Pool {
return &Pool{ctx: context.Background(), logger: log.Default()}
}
func (d *Pool) WithContext(ctx context.Context) *Pool {
return &Pool{
ctx: ctx,
srvMaster: d.srvMaster,
srvSlaves: d.srvSlaves,
srvSlavesAsync: d.srvSlavesAsync,
mu: d.mu,
notAliveConns: d.notAliveConns,
slavesIter: d.slavesIter,
slavesAsyncIter: d.slavesAsyncIter,
}
}
func (d *Pool) NewConn(mode connMode, pgConnString string) error {
if c, err := d.newConn(mode, pgConnString); err != nil {
return err
} else {
switch mode {
case ConnModeMaster:
d.srvMaster = c
case ConnModeSync:
d.srvSlaves = append(d.srvSlaves, c)
case ConnModeAsync:
d.srvSlavesAsync = append(d.srvSlavesAsync, c)
default:
return errors.New("unknown mode")
}
return nil
}
}
func (d *Pool) newConn(mode connMode, pgConnString string) (c *conn, err error) {
var pool *pgxpool.Pool
if !strings.Contains(pgConnString, "default_query_exec_mode=") {
pgConnString += " default_query_exec_mode=simple_protocol"
}
if pool, err = pgxpool.New(d.ctx, pgConnString); err != nil {
return &conn{Pool: pool, Alive: false, Mode: mode}, err
}
c = &conn{Pool: pool, Alive: false, Mode: mode}
if err = d.testConn(c); err != nil {
return c, err
}
c.Alive = true
return c, nil
}
func (d *Pool) slave() *conn {
if len(d.srvSlaves) == 0 {
return d.srvMaster
}
d.mu.RLock()
defer d.mu.RUnlock()
return d.srvSlaves[d.slavesIter.Add(1)%int64(len(d.srvSlaves))]
}
func (d *Pool) slaveAsync() *conn {
if len(d.srvSlavesAsync) == 0 {
return d.slave()
}
d.mu.RLock()
defer d.mu.RUnlock()
return d.srvSlavesAsync[d.slavesAsyncIter.Add(1)%int64(len(d.srvSlavesAsync))]
}
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *conn, dst1 any) error) error {
for {
var c *conn
if pool == ConnModeSync {
c = d.slave()
} else {
c = d.slaveAsync()
}
if err := f(c, dst); err != nil {
if c.Mode == ConnModeMaster {
return err
} else {
if strings.Contains(err.Error(), "connect") || strings.Contains(err.Error(), "EOF") {
d.logger.Printf("DB_EXEC_WRAPPER_ERR: %s", err.Error())
d.setNotAliveConn(c)
continue
} else {
return err
}
}
}
return nil
}
}
func (d *Pool) testConn(q *conn) (err error) {
var n any
if err = d.QGet(q, &n, "SELECT 1"); err != nil {
d.logger.Printf("DB_TEST_CONN_ERR: MODE: %v, HOST: %s, PORT: %d, ERR: %v",
q.Mode.String(),
q.Config().ConnConfig.Host,
q.Config().ConnConfig.Port,
err,
)
}
return
}
func (d *Pool) setNotAliveConn(conn *conn) {
d.mu.Lock()
defer d.mu.Unlock()
for i, slave := range d.srvSlaves {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvSlaves = remove(d.srvSlaves, i)
return
}
}
for i, slave := range d.srvSlavesAsync {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvSlavesAsync = remove(d.srvSlavesAsync, i)
return
}
}
}
func (d *Pool) Test() {
go func() {
for {
for i, p := range d.notAliveConns {
if err := d.testConn(p); err == nil {
d.mu.Lock()
p.Alive = true
d.notAliveConns = remove(d.notAliveConns, i)
if p.Mode == ConnModeSync {
d.srvSlaves = append(d.srvSlaves, p)
} else if p.Mode == ConnModeAsync {
d.srvSlavesAsync = append(d.srvSlavesAsync, p)
}
d.mu.Unlock()
}
}
d.srvMaster.Alive = d.testConn(d.srvMaster) == nil
time.Sleep(time.Second * 1)
}
}()
}
func (d *Pool) prepare(sql string, param map[string]any) string {
for n, t := range param {
switch t.(type) {
case time.Time:
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime)))
case *time.Time:
if t.(*time.Time) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime)))
}
case nil:
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
case bool:
if t.(bool) {
sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
}
case *string:
if t.(*string) == nil || *t.(*string) == "NULL" {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*t.(*string), "'", "''")))
}
case string:
if t.(string) == "NULL" {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''")))
}
case *int:
if t.(*int) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int)))
}
case *bool:
if t.(*bool) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
if *t.(*bool) {
sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
}
}
case *int64:
if t.(*int64) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64)))
}
case *float64:
if t.(*float64) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64)))
}
case int, int64:
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
default:
switch reflect.TypeOf(t).Kind() {
case reflect.Slice:
sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(t), " "), ","), "[]")+"}'")
}
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", t))
}
}
if _, ok := param["_debug"]; ok {
d.logger.Printf(sql)
}
return sql
}
package zdb
import "github.com/georgysavva/scany/v2/pgxscan"
func (d *Pool) WSelect(dst any, sql string, args ...any) error {
return d.qSelect(d.srvMaster, dst, sql, args...)
}
func (d *Pool) WSelectNamed(dst any, sql string, args map[string]any) error {
return d.qSelect(d.srvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) WSelectOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.qSelect(d.srvMaster, dst, d.prepare(sql, args))
}
func (d *Pool) Select(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, sql, args...)
})
}
func (d *Pool) SelectNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectAsync(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, sql, args...)
})
}
func (d *Pool) SelectAsyncNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectAsyncOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) qSelect(q *conn, dst any, sql string, args ...any) error {
return pgxscan.Select(d.ctx, q, dst, sql, args...)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment