Commit 8860ac58 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

add balance

parent 1c1eb92b
......@@ -13,17 +13,17 @@ func (d *Pool) WAny(dst any, sql string, args ...any) error {
return d.qAny(d.srvMaster, dst, sql, args...)
}
func (d *Pool) Any(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *Conn, dst1 any) error {
return d.qAny(conn, dst1, sql, args...)
})
}
func (d *Pool) AnyAsync(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *Conn, dst1 any) error {
return d.qAny(conn, dst1, sql, args...)
})
}
func (d *Pool) qAny(q *conn, dst any, sql string, args ...any) error {
func (d *Pool) qAny(q *Conn, dst any, sql string, args ...any) error {
var rows pgx.Rows
var err error
......
......@@ -5,7 +5,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)
type conn struct {
type Conn struct {
*pgxpool.Pool
Mode connMode
......@@ -36,6 +36,6 @@ func (c connMode) String() string {
}
}
func (c conn) ToString() string {
func (c Conn) ToString() string {
return fmt.Sprintf("%s: %s:%d", c.Mode, c.Config().ConnConfig.Host, c.Config().ConnConfig.Port)
}
......@@ -12,7 +12,7 @@ func (d *Pool) WExecOpts(opts Opts) error {
return d.qExec(d.srvMaster, d.prepare(sql, args))
}
func (d *Pool) qExec(q *conn, sql string, args ...any) error {
func (d *Pool) qExec(q *Conn, sql string, args ...any) error {
_, err := q.Exec(d.ctx, sql, args...)
return err
......
......@@ -15,37 +15,37 @@ func (d *Pool) WGetOpts(dst any, opts Opts) error {
}
func (d *Pool) Get(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, sql, args...)
})
}
func (d *Pool) GetNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeSync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, d.prepare(sql, args))
})
}
func (d *Pool) GetOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeSync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, d.prepare(sql, args))
})
}
func (d *Pool) GetAsync(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeAsync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, sql, args...)
})
}
func (d *Pool) GetAsyncNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeAsync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, d.prepare(sql, args))
})
}
func (d *Pool) GetAsyncOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeAsync, dst, func(q *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(q *Conn, dst1 any) error {
return d.qGet(q, dst1, d.prepare(sql, args))
})
}
......
......@@ -13,17 +13,24 @@ import (
"time"
)
type Balance int
const (
BalanceRoundRobin Balance = iota
BalanceLeastConn
)
type Pool struct {
ctx context.Context
logger Logger
srvMaster *conn
srvSlaves []*conn
srvSlavesAsync []*conn
srvMaster *Conn
srvSync []*Conn
srvAsync []*Conn
mu *sync.RWMutex
notAliveConns []*conn
notAliveConns []*Conn
slavesIter *atomic.Int64
slavesAsyncIter *atomic.Int64
......@@ -36,6 +43,7 @@ type Pool struct {
ContinuesTry []string
TryOnError int
TryOnSleep time.Duration
Balance Balance
}
func New() *Pool {
......@@ -48,6 +56,7 @@ func New() *Pool {
ContinuesTry: []string{"conflict with recovery"},
TryOnError: 1,
TryOnSleep: time.Second,
Balance: BalanceLeastConn,
}
}
......@@ -63,8 +72,8 @@ func (d *Pool) WithContext(ctx context.Context) *Pool {
return &Pool{
ctx: ctx,
srvMaster: d.srvMaster,
srvSlaves: d.srvSlaves,
srvSlavesAsync: d.srvSlavesAsync,
srvSync: d.srvSync,
srvAsync: d.srvAsync,
mu: d.mu,
notAliveConns: d.notAliveConns,
slavesIter: d.slavesIter,
......@@ -82,9 +91,9 @@ func (d *Pool) NewConn(mode connMode, pgConnString string) error {
case ConnModeMaster:
d.srvMaster = q
case ConnModeSync:
d.srvSlaves = append(d.srvSlaves, q)
d.srvSync = append(d.srvSync, q)
case ConnModeAsync:
d.srvSlavesAsync = append(d.srvSlavesAsync, q)
d.srvAsync = append(d.srvAsync, q)
default:
return errors.New("unknown mode")
}
......@@ -102,7 +111,7 @@ func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
return nil
}
func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error) {
func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
var pgxPool *pgxpool.Pool
var pgxConfig *pgxpool.Config
......@@ -114,23 +123,11 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error)
return nil, err
}
//pgxConfig.AfterConnect = func(ctx context.Context, p *pgx.Conn) error {
// j := &pgtype.Type{Name: "jsonb", OID: pgtype.JSONBOID, Codec: ztype.JSONBCodec{}}
// jb := &pgtype.Type{Name: "json", OID: pgtype.JSONOID, Codec: ztype.JSONCodec{}}
//
// p.TypeMap().RegisterType(j)
// p.TypeMap().RegisterType(jb)
// p.TypeMap().RegisterType(&pgtype.Type{Name: "_json", OID: pgtype.JSONArrayOID, Codec: &pgtype.ArrayCodec{ElementType: j}})
// p.TypeMap().RegisterType(&pgtype.Type{Name: "_jsonb", OID: pgtype.JSONBArrayOID, Codec: &pgtype.ArrayCodec{ElementType: jb}})
//
// return nil
//}
if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
return &conn{Pool: pgxPool, Alive: false, Mode: mode}, err
return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
}
q = &conn{Pool: pgxPool, Alive: false, Mode: mode}
q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}
if err = d.ping(q); err != nil {
return q, err
......@@ -141,31 +138,67 @@ func (d *Pool) newConn(mode connMode, pgConnString string) (q *conn, err error)
return q, nil
}
func (d *Pool) sync() *conn {
if len(d.srvSlaves) == 0 {
func (d *Pool) sync() *Conn {
if len(d.srvSync) == 0 {
return d.srvMaster
}
d.mu.RLock()
defer d.mu.RUnlock()
return d.srvSlaves[d.slavesIter.Add(1)%int64(len(d.srvSlaves))]
if len(d.srvSync) == 1 {
return d.srvSync[0]
}
if d.Balance == BalanceRoundRobin {
return d.srvSync[d.slavesIter.Add(1)%int64(len(d.srvSync))]
}
var out *Conn
var min int32 = 1 << 30
for _, conn := range d.srvSync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
}
}
return out
}
func (d *Pool) async() *conn {
if len(d.srvSlavesAsync) == 0 {
func (d *Pool) async() *Conn {
if len(d.srvAsync) == 0 {
return d.sync()
}
d.mu.RLock()
defer d.mu.RUnlock()
return d.srvSlavesAsync[d.slavesAsyncIter.Add(1)%int64(len(d.srvSlavesAsync))]
if len(d.srvAsync) == 1 {
return d.srvAsync[0]
}
if d.Balance == BalanceRoundRobin {
return d.srvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.srvAsync))]
}
var out *Conn
var min int32 = 1 << 30
for _, conn := range d.srvAsync {
if conn.Stat().AcquiredConns() < min {
min = conn.Stat().AcquiredConns()
out = conn
}
}
return out
}
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *conn, dst1 any) error) error {
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
for {
var q *conn
var q *Conn
try := 0
if pool == ConnModeSync {
......@@ -202,7 +235,7 @@ func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *conn, dst1 any)
}
}
func (d *Pool) ping(q *conn) (err error) {
func (d *Pool) ping(q *Conn) (err error) {
var n any
if err = d.qGet(q, &n, "SELECT 1"); err != nil {
......@@ -215,24 +248,24 @@ func (d *Pool) ping(q *conn) (err error) {
return
}
func (d *Pool) setNotAliveConn(conn *conn) {
func (d *Pool) setNotAliveConn(conn *Conn) {
d.mu.Lock()
defer d.mu.Unlock()
for i, slave := range d.srvSlaves {
for i, slave := range d.srvSync {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvSlaves = remove(d.srvSlaves, i)
d.srvSync = remove(d.srvSync, i)
return
}
}
for i, slave := range d.srvSlavesAsync {
for i, slave := range d.srvAsync {
if slave == conn {
conn.Alive = false
d.notAliveConns = append(d.notAliveConns, conn)
d.srvSlavesAsync = remove(d.srvSlavesAsync, i)
d.srvAsync = remove(d.srvAsync, i)
return
}
......@@ -254,9 +287,9 @@ func (d *Pool) Start() {
q.Alive = true
d.notAliveConns = remove(d.notAliveConns, i)
if q.Mode == ConnModeSync {
d.srvSlaves = append(d.srvSlaves, q)
d.srvSync = append(d.srvSync, q)
} else if q.Mode == ConnModeAsync {
d.srvSlavesAsync = append(d.srvSlavesAsync, q)
d.srvAsync = append(d.srvAsync, q)
}
d.mu.Unlock()
}
......
......@@ -17,41 +17,41 @@ func (d *Pool) WSelectOpts(dst any, opts Opts) error {
}
func (d *Pool) Select(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, sql, args...)
})
}
func (d *Pool) SelectNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeSync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeSync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectAsync(dst any, sql string, args ...any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, sql, args...)
})
}
func (d *Pool) SelectAsyncNamed(dst any, sql string, args map[string]any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) SelectAsyncOpts(dst any, opts Opts) error {
sql, args := opts.Opts()
return d.execWrapper(ConnModeAsync, dst, func(conn *conn, dst1 any) error {
return d.execWrapper(ConnModeAsync, dst, func(conn *Conn, dst1 any) error {
return d.qSelect(conn, dst1, d.prepare(sql, args))
})
}
func (d *Pool) qSelect(q *conn, dst any, sql string, args ...any) error {
func (d *Pool) qSelect(q *Conn, dst any, sql string, args ...any) error {
return pgxscan.Select(d.ctx, q, dst, sql, args...)
}
......@@ -30,7 +30,7 @@ func (d *Pool) StatMaster() *Stat {
func (d *Pool) StatPool() []*Stat {
out := make([]*Stat, 0)
for _, q := range append(d.srvSlaves, d.srvSlavesAsync...) {
for _, q := range append(d.srvSync, d.srvAsync...) {
out = append(out, d.stat(q))
}
......@@ -62,7 +62,7 @@ func (d *Pool) StatPoolTotal() *Stat {
return m
}
func (d *Pool) stat(q *conn) *Stat {
func (d *Pool) stat(q *Conn) *Stat {
s := q.Stat()
return &Stat{
......
package ztype
import (
"database/sql/driver"
"fmt"
"github.com/goccy/go-json"
pgxtype "github.com/jackc/pgx/v5/pgtype"
"reflect"
)
type JSONCodec struct{}
func (JSONCodec) FormatSupported(format int16) bool {
return format == pgxtype.TextFormatCode || format == pgxtype.BinaryFormatCode
}
func (JSONCodec) PreferredFormat() int16 {
return pgxtype.TextFormatCode
}
func (c JSONCodec) PlanEncode(m *pgxtype.Map, oid uint32, format int16, value any) pgxtype.EncodePlan {
switch value.(type) {
case string:
return encodePlanJSONCodecEitherFormatString{}
case []byte:
return encodePlanJSONCodecEitherFormatByteSlice{}
}
// Because anything can be marshalled the normal wrapping in Map.PlanScan doesn't get a chance to run. So try the
// appropriate wrappers here.
for _, f := range []pgxtype.TryWrapEncodePlanFunc{
pgxtype.TryWrapDerefPointerEncodePlan,
pgxtype.TryWrapFindUnderlyingTypeEncodePlan,
} {
if wrapperPlan, nextValue, ok := f(value); ok {
if nextPlan := c.PlanEncode(m, oid, format, nextValue); nextPlan != nil {
wrapperPlan.SetNext(nextPlan)
return wrapperPlan
}
}
}
return encodePlanJSONCodecEitherFormatMarshal{}
}
type encodePlanJSONCodecEitherFormatString struct{}
func (encodePlanJSONCodecEitherFormatString) Encode(value any, buf []byte) (newBuf []byte, err error) {
jsonString := value.(string)
buf = append(buf, jsonString...)
return buf, nil
}
type encodePlanJSONCodecEitherFormatByteSlice struct{}
func (encodePlanJSONCodecEitherFormatByteSlice) Encode(value any, buf []byte) (newBuf []byte, err error) {
jsonBytes := value.([]byte)
if jsonBytes == nil {
return nil, nil
}
buf = append(buf, jsonBytes...)
return buf, nil
}
type encodePlanJSONCodecEitherFormatMarshal struct{}
func (encodePlanJSONCodecEitherFormatMarshal) Encode(value any, buf []byte) (newBuf []byte, err error) {
jsonBytes, err := json.Marshal(value)
if err != nil {
return nil, err
}
buf = append(buf, jsonBytes...)
return buf, nil
}
func (JSONCodec) PlanScan(m *pgxtype.Map, oid uint32, format int16, target any) pgxtype.ScanPlan {
switch target.(type) {
case *string:
return scanPlanAnyToString{}
case *[]byte:
return scanPlanJSONToByteSlice{}
case pgxtype.BytesScanner:
return scanPlanBinaryBytesToBytesScanner{}
}
return scanPlanJSONToJSONUnmarshal{}
}
type scanPlanAnyToString struct{}
func (scanPlanAnyToString) Scan(src []byte, dst any) error {
p := dst.(*string)
*p = string(src)
return nil
}
type scanPlanJSONToByteSlice struct{}
func (scanPlanJSONToByteSlice) Scan(src []byte, dst any) error {
dstBuf := dst.(*[]byte)
if src == nil {
*dstBuf = nil
return nil
}
*dstBuf = make([]byte, len(src))
copy(*dstBuf, src)
return nil
}
type scanPlanJSONToBytesScanner struct{}
func (scanPlanJSONToBytesScanner) Scan(src []byte, dst any) error {
scanner := (dst).(pgxtype.BytesScanner)
return scanner.ScanBytes(src)
}
type scanPlanJSONToJSONUnmarshal struct{}
func (scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error {
if src == nil {
dstValue := reflect.ValueOf(dst)
if dstValue.Kind() == reflect.Ptr {
el := dstValue.Elem()
switch el.Kind() {
case reflect.Ptr, reflect.Slice, reflect.Map:
el.Set(reflect.Zero(el.Type()))
return nil
}
}
return fmt.Errorf("cannot scan NULL into %T", dst)
}
elem := reflect.ValueOf(dst).Elem()
elem.Set(reflect.Zero(elem.Type()))
return json.Unmarshal(src, dst)
}
func (c JSONCodec) DecodeDatabaseSQLValue(m *pgxtype.Map, oid uint32, format int16, src []byte) (driver.Value, error) {
if src == nil {
return nil, nil
}
dstBuf := make([]byte, len(src))
copy(dstBuf, src)
return dstBuf, nil
}
func (c JSONCodec) DecodeValue(m *pgxtype.Map, oid uint32, format int16, src []byte) (any, error) {
if src == nil {
return nil, nil
}
var dst any
err := json.Unmarshal(src, &dst)
return dst, err
}
type scanPlanBinaryBytesToBytesScanner struct{}
func (scanPlanBinaryBytesToBytesScanner) Scan(src []byte, dst any) error {
scanner := (dst).(pgxtype.BytesScanner)
return scanner.ScanBytes(src)
}
package ztype
import (
"database/sql/driver"
"fmt"
"github.com/goccy/go-json"
pgxtype "github.com/jackc/pgx/v5/pgtype"
)
type JSONBCodec struct{}
func (JSONBCodec) FormatSupported(format int16) bool {
return format == pgxtype.TextFormatCode || format == pgxtype.BinaryFormatCode
}
func (JSONBCodec) PreferredFormat() int16 {
return pgxtype.TextFormatCode
}
func (JSONBCodec) PlanEncode(m *pgxtype.Map, oid uint32, format int16, value any) pgxtype.EncodePlan {
switch format {
case pgxtype.BinaryFormatCode:
plan := JSONCodec{}.PlanEncode(m, oid, pgxtype.TextFormatCode, value)
if plan != nil {
return &encodePlanJSONBCodecBinaryWrapper{textPlan: plan}
}
case pgxtype.TextFormatCode:
return JSONCodec{}.PlanEncode(m, oid, format, value)
}
return nil
}
type encodePlanJSONBCodecBinaryWrapper struct {
textPlan pgxtype.EncodePlan
}
func (plan *encodePlanJSONBCodecBinaryWrapper) Encode(value any, buf []byte) (newBuf []byte, err error) {
buf = append(buf, 1)
return plan.textPlan.Encode(value, buf)
}
func (JSONBCodec) PlanScan(m *pgxtype.Map, oid uint32, format int16, target any) pgxtype.ScanPlan {
switch format {
case pgxtype.BinaryFormatCode:
plan := JSONCodec{}.PlanScan(m, oid, pgxtype.TextFormatCode, target)
if plan != nil {
return &scanPlanJSONBCodecBinaryUnwrapper{textPlan: plan}
}
case pgxtype.TextFormatCode:
return JSONCodec{}.PlanScan(m, oid, format, target)
}
return nil
}
type scanPlanJSONBCodecBinaryUnwrapper struct {
textPlan pgxtype.ScanPlan
}
func (plan *scanPlanJSONBCodecBinaryUnwrapper) Scan(src []byte, dst any) error {
if src == nil {
return plan.textPlan.Scan(src, dst)
}
if len(src) == 0 {
return fmt.Errorf("jsonb too short")
}
if src[0] != 1 {
return fmt.Errorf("unknown jsonb version number %d", src[0])
}
return plan.textPlan.Scan(src[1:], dst)
}
func (c JSONBCodec) DecodeDatabaseSQLValue(m *pgxtype.Map, oid uint32, format int16, src []byte) (driver.Value, error) {
if src == nil {
return nil, nil
}
switch format {
case pgxtype.BinaryFormatCode:
if len(src) == 0 {
return nil, fmt.Errorf("jsonb too short")
}
if src[0] != 1 {
return nil, fmt.Errorf("unknown jsonb version number %d", src[0])
}
dstBuf := make([]byte, len(src)-1)
copy(dstBuf, src[1:])
return dstBuf, nil
case pgxtype.TextFormatCode:
dstBuf := make([]byte, len(src))
copy(dstBuf, src)
return dstBuf, nil
default:
return nil, fmt.Errorf("unknown format code: %v", format)
}
}
func (c JSONBCodec) DecodeValue(m *pgxtype.Map, oid uint32, format int16, src []byte) (any, error) {
if src == nil {
return nil, nil
}
switch format {
case pgxtype.BinaryFormatCode:
if len(src) == 0 {
return nil, fmt.Errorf("jsonb too short")
}
if src[0] != 1 {
return nil, fmt.Errorf("unknown jsonb version number %d", src[0])
}
src = src[1:]
case pgxtype.TextFormatCode:
default:
return nil, fmt.Errorf("unknown format code: %v", format)
}
var dst any
err := json.Unmarshal(src, &dst)
return dst, err
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment