pool.go 9.08 KB
Newer Older
Vladimir Barsukov's avatar
Vladimir Barsukov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package zdb

import (
	"context"
	"errors"
	"fmt"
	"github.com/jackc/pgx/v5/pgxpool"
	"log"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

type Balance int

const (
	BalanceRoundRobin Balance = iota
	BalanceLeastConn
)

type Pool struct {
	ctx             context.Context
	logger          Logger
	SrvMaster       *Conn
	SrvSync         []*Conn
	SrvAsync        []*Conn
	mu              *sync.RWMutex
	notAliveConns   []*Conn
	slavesIter      *atomic.Int64
	slavesAsyncIter *atomic.Int64
	stop            bool
	Continues       []string
	ContinuesTry    []string
	TryOnError      int
	TryOnSleep      time.Duration
	Balance         Balance
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
39
40
	PingTimout      time.Duration
	PingTry         int
Vladimir Barsukov's avatar
Vladimir Barsukov committed
41
42
43
44
45
46
47
48
}

func New(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
		mu:              &sync.RWMutex{},
		slavesIter:      &atomic.Int64{},
		slavesAsyncIter: &atomic.Int64{},
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
49
		Continues:       []string{"connect", "EOF", "conflict with recovery", "context deadline exceeded"},
Vladimir Barsukov's avatar
Vladimir Barsukov committed
50
51
52
53
		ContinuesTry:    []string{"conflict with recovery"},
		TryOnError:      1,
		TryOnSleep:      time.Second,
		Balance:         BalanceLeastConn,
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
54
55
		PingTimout:      time.Second * 5,
		PingTry:         5,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
	}
}

func NewDefault() *Pool {
	p := New(context.Background())
	p.logger = log.Default()

	return p
}

func (d *Pool) WithContext(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
		logger:          d.logger,
		SrvMaster:       d.SrvMaster,
		SrvSync:         d.SrvSync,
		SrvAsync:        d.SrvAsync,
		mu:              d.mu,
		notAliveConns:   d.notAliveConns,
		slavesIter:      d.slavesIter,
		slavesAsyncIter: d.slavesAsyncIter,
		stop:            d.stop,
		Continues:       d.Continues,
		ContinuesTry:    d.ContinuesTry,
		TryOnError:      d.TryOnError,
		TryOnSleep:      d.TryOnSleep,
		Balance:         d.Balance,
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
83
84
		PingTimout:      d.PingTimout,
		PingTry:         d.PingTry,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
	}
}

func (d *Pool) WithTimeout(dur time.Duration) *Pool {
	ctx, _ := context.WithTimeout(d.ctx, dur)

	return d.WithContext(ctx)
}

func (d *Pool) WithDeadline(dur time.Time) *Pool {
	ctx, _ := context.WithDeadline(d.ctx, dur)

	return d.WithContext(ctx)
}

func (d *Pool) NewConn(mode connMode, pgConnString string) error {
	d.mu.Lock()
	defer d.mu.Unlock()

	q, err := d.newConn(mode, pgConnString)

	switch mode {
	case ConnModeMaster:
		d.SrvMaster = q
	case ConnModeSync:
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
110
		q.Index = len(d.SrvSync)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
111
112
		d.SrvSync = append(d.SrvSync, q)
	case ConnModeAsync:
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
113
		q.Index = len(d.SrvAsync)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
		d.SrvAsync = append(d.SrvAsync, q)
	default:
		return errors.New("unknown mode")
	}

	return err
}

func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
	for _, s := range pgConnString {
		if err := d.NewConn(mode, s); err != nil {
			return err
		}
	}

	return nil
}

func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
	var pgxPool *pgxpool.Pool
	var pgxConfig *pgxpool.Config

	if !strings.Contains(pgConnString, "default_query_exec_mode=") {
		pgConnString += " default_query_exec_mode=simple_protocol"
	}

	if !strings.Contains(pgConnString, "connect_timeout=") {
		pgConnString += " connect_timeout=3"
	}

	if !strings.Contains(pgConnString, "sslmode=") {
		pgConnString += " sslmode=disable"
	}

	if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
		return nil, err
	}

	if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
		return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
	}

	q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
158
	if err = d.Ping(q); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
159
160
161
162
163
164
165
166
		return q, err
	}

	q.Alive = true

	return q, nil
}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
func (d *Pool) least(s []*Conn) *Conn {
	var out *Conn
	var m float64 = 0

	for i, conn := range s {
		ratio := float64(conn.Stat().AcquiredConns()) / float64(conn.Stat().MaxConns())

		if ratio < m || i == 0 {
			m = ratio
			out = conn
		}
	}

	logConnStat(out)

	return out
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
func (d *Pool) sync() *Conn {
	if len(d.SrvSync) == 0 {
		return d.SrvMaster
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

	if len(d.SrvSync) == 1 {
		return d.SrvSync[0]
	}

	if d.Balance == BalanceRoundRobin {
		return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
	}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
201
	return d.least(d.SrvSync)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
}

func (d *Pool) async() *Conn {
	if len(d.SrvAsync) == 0 {
		return d.sync()
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

	if len(d.SrvAsync) == 1 {
		return d.SrvAsync[0]
	}

	if d.Balance == BalanceRoundRobin {
		return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
	}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
220
	return d.least(d.SrvAsync)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
}

func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
	for {
		var q *Conn
		try := 0

		if pool == ConnModeSync {
			q = d.sync()
		} else {
			q = d.async()
		}

	repeat:
		if err := f(q, dst); err != nil {
			if q.Mode == ConnModeMaster {
				return err
			} else {

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
240
				if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
241
					try++
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
242
					d.logger.Printf("ZDB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())
Vladimir Barsukov's avatar
Vladimir Barsukov committed
243
244
245
246
247

					time.Sleep(d.TryOnSleep)
					goto repeat
				}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
248
				if contains(err.Error(), d.Continues) {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
249
					d.setNotAliveConn(q)
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
250
					d.logger.Printf("ZDB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
Vladimir Barsukov's avatar
Vladimir Barsukov committed
251
252
253
254
255
256
257
258
259
260
261
					continue
				} else {
					return err
				}
			}
		}

		return nil
	}
}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
262
func (d *Pool) Ping(q *Conn) (err error) {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
263
264
	var n any

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
265
266
267
268
	if err = d.WithTimeout(d.PingTimout).qGet(q, &n, "SELECT 1"); err != nil {
		q.PingTry++

		d.logger.Printf("ZDB_PING_ERR: SRV: %s; TRY: %d; %v",
Vladimir Barsukov's avatar
Vladimir Barsukov committed
269
			q.ToString(),
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
270
			q.PingTry,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
271
272
			err,
		)
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
273
274
275
276

		if d.PingTry <= q.PingTry {
			return nil
		}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
277
278
	}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
279
280
	q.PingTry = 0

Vladimir Barsukov's avatar
Vladimir Barsukov committed
281
282
283
284
285
286
287
288
289
290
291
	return
}

func (d *Pool) setNotAliveConn(conn *Conn) {
	d.mu.Lock()
	defer d.mu.Unlock()

	for i, slave := range d.SrvSync {
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
292
			d.SrvSync = remove(d.SrvSync, i)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
293
294
295
296
297
298
299
300
			return
		}
	}

	for i, slave := range d.SrvAsync {
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
301
			d.SrvAsync = remove(d.SrvAsync, i)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

			return
		}
	}
}

func (d *Pool) Start() {
	d.stop = false

	go func() {
		for {
			if d.stop {
				return
			}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
317
		rep:
Vladimir Barsukov's avatar
Vladimir Barsukov committed
318
			for i, q := range d.notAliveConns {
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
319
				if err := d.Ping(q); err == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
320
321
					d.mu.Lock()
					q.Alive = true
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
322
					d.notAliveConns = remove(d.notAliveConns, i)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
323
324
325
326
327
328
					if q.Mode == ConnModeSync {
						d.SrvSync = append(d.SrvSync, q)
					} else if q.Mode == ConnModeAsync {
						d.SrvAsync = append(d.SrvAsync, q)
					}
					d.mu.Unlock()
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
329
					goto rep
Vladimir Barsukov's avatar
Vladimir Barsukov committed
330
331
332
333
				}
			}

			if d.SrvMaster != nil {
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
334
				d.SrvMaster.Alive = d.Ping(d.SrvMaster) == nil
Vladimir Barsukov's avatar
Vladimir Barsukov committed
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
			}

			time.Sleep(time.Second)
		}
	}()
}

func (d *Pool) Stop() {
	d.stop = true
}

func (d *Pool) IsAlive() bool {
	return d.SrvMaster != nil && d.SrvMaster.Alive
}

func (d *Pool) prepare(sql string, param map[string]any) string {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
351
352
	for n, t1 := range param {
		switch tv := t1.(type) {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
353
		case time.Time:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
354
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'::timestamptz", tv.Format(time.RFC3339Nano)))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
355
		case *time.Time:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
356
			if tv == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
357
358
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
359
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'::timestamptz", tv.Format(time.RFC3339Nano)))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
360
361
362
363
			}
		case nil:
			sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
		case bool:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
364
			if tv {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
365
366
367
368
369
				sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
			}
		case *string:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
370
			if tv == nil || *tv == "NULL" {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
371
372
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
373
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*tv, "'", "''")))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
374
375
			}
		case string:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
376
			if tv == "NULL" {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
377
378
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
379
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(tv, "'", "''")))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
380
381
			}
		case *int:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
382
			if tv == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
383
384
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
385
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
386
387
			}
		case *bool:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
388
			if tv == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
389
390
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
391
				if *tv {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
392
393
394
395
396
397
					sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
				} else {
					sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
				}
			}
		case *int64:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
398
			if tv == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
399
400
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
401
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
402
403
			}
		case *float64:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
404
			if tv == nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
405
406
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
407
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
408
409
			}
		case int, int64:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
410
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", tv))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
411
		default:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
412
			switch reflect.TypeOf(tv).Kind() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
413
			case reflect.Slice, reflect.Array:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
414
				sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(tv), " "), ","), "[]")+"}'")
Vladimir Barsukov's avatar
Vladimir Barsukov committed
415
416
			}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
417
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", tv))
Vladimir Barsukov's avatar
Vladimir Barsukov committed
418
419
420
		}
	}

Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
	if v, ok := param["_debug"]; ok {
		switch vv := v.(type) {
		case bool:
			if vv {
				d.logger.Printf(sql)
			}
		case int, uint:
			if vv == 1 {
				d.logger.Printf(sql)
			}
		case string:
			if vv == "1" {
				d.logger.Printf(sql)
			}
		}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
436
437
438
439
	}

	return sql
}