pool.go 8.68 KB
Newer Older
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package zdb

import (
	"context"
	"errors"
	"fmt"
	"github.com/jackc/pgx/v5/pgxpool"
	"log"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

Vladimir Barsukov's avatar
Vladimir Barsukov committed
16
17
18
19
20
21
22
type Balance int

const (
	BalanceRoundRobin Balance = iota
	BalanceLeastConn
)

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
23
type Pool struct {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
24
25
26
27
28
29
30
	ctx             context.Context
	logger          Logger
	SrvMaster       *Conn
	SrvSync         []*Conn
	SrvAsync        []*Conn
	mu              *sync.RWMutex
	notAliveConns   []*Conn
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
31
32
	slavesIter      *atomic.Int64
	slavesAsyncIter *atomic.Int64
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
33
34
35
36
37
38
39
	stop            bool
	PgTsFormat      string
	Continues       []string
	ContinuesTry    []string
	TryOnError      int
	TryOnSleep      time.Duration
	Balance         Balance
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
40
41
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
42
func New(ctx context.Context) *Pool {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
43
	return &Pool{
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
44
		ctx:             ctx,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
45
46
47
		mu:              &sync.RWMutex{},
		slavesIter:      &atomic.Int64{},
		slavesAsyncIter: &atomic.Int64{},
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
48
		PgTsFormat:      "2006-01-02 15:04:05",
Vladimir Barsukov's avatar
Vladimir Barsukov committed
49
50
51
		Continues:       []string{"connect", "EOF", "conflict with recovery"},
		ContinuesTry:    []string{"conflict with recovery"},
		TryOnError:      1,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
52
		TryOnSleep:      time.Second,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
53
		Balance:         BalanceLeastConn,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
54
55
56
	}
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
57
func NewDefault() *Pool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
58
	p := New(context.Background())
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
59
60
61
	p.logger = log.Default()

	return p
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
62
63
64
65
66
}

func (d *Pool) WithContext(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
67
68
69
70
		logger:          d.logger,
		SrvMaster:       d.SrvMaster,
		SrvSync:         d.SrvSync,
		SrvAsync:        d.SrvAsync,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
71
72
73
74
		mu:              d.mu,
		notAliveConns:   d.notAliveConns,
		slavesIter:      d.slavesIter,
		slavesAsyncIter: d.slavesAsyncIter,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
75
76
77
78
79
80
81
		stop:            d.stop,
		PgTsFormat:      d.PgTsFormat,
		Continues:       d.Continues,
		ContinuesTry:    d.ContinuesTry,
		TryOnError:      d.TryOnError,
		TryOnSleep:      d.TryOnSleep,
		Balance:         d.Balance,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
82
83
84
	}
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
func (d *Pool) WithTimeout(dur time.Duration) *Pool {
	ctx, cancel := context.WithTimeout(d.ctx, dur)
	defer cancel()

	return d.WithContext(ctx)
}

func (d *Pool) WithDeadline(dur time.Time) *Pool {
	ctx, cancel := context.WithDeadline(d.ctx, dur)
	defer cancel()

	return d.WithContext(ctx)
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
99
func (d *Pool) NewConn(mode connMode, pgConnString string) error {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
100
101
	d.mu.Lock()
	defer d.mu.Unlock()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
102

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
103
104
105
106
	q, err := d.newConn(mode, pgConnString)

	switch mode {
	case ConnModeMaster:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
107
		d.SrvMaster = q
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
108
	case ConnModeSync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
109
		d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
110
	case ConnModeAsync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
111
		d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
112
113
	default:
		return errors.New("unknown mode")
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
114
	}
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
115
116

	return err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
117
}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
118
119
120
121
122
123
124
125
126
127
128

func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
	for _, s := range pgConnString {
		if err := d.NewConn(mode, s); err != nil {
			return err
		}
	}

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
129
func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
130
	var pgxPool *pgxpool.Pool
Vladimir Barsukov's avatar
Vladimir Barsukov committed
131
	var pgxConfig *pgxpool.Config
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
132
133
134
135
136

	if !strings.Contains(pgConnString, "default_query_exec_mode=") {
		pgConnString += " default_query_exec_mode=simple_protocol"
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
137
138
139
140
	if !strings.Contains(pgConnString, "connect_timeout=") {
		pgConnString += " connect_timeout=3"
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
141
142
143
144
145
	if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
		return nil, err
	}

	if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
146
		return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
147
148
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
149
	q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
150

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
151
	if err = d.ping(q); err != nil {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
152
		return q, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
153
154
	}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
155
	q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
156

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
157
	return q, nil
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
158
159
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
160
func (d *Pool) sync() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
161
162
	if len(d.SrvSync) == 0 {
		return d.SrvMaster
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
163
164
165
166
167
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
168
169
	if len(d.SrvSync) == 1 {
		return d.SrvSync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
170
171
172
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
173
		return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
174
175
176
177
178
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
179
	for _, conn := range d.SrvSync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
180
181
182
183
184
185
186
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
187
188
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
189
func (d *Pool) async() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
190
	if len(d.SrvAsync) == 0 {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
191
		return d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
192
193
194
195
196
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
197
198
	if len(d.SrvAsync) == 1 {
		return d.SrvAsync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
199
200
201
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
202
		return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
203
204
205
206
207
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
208
	for _, conn := range d.SrvAsync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
209
210
211
212
213
214
215
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
216
217
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
218
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
219
	for {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
220
		var q *Conn
Vladimir Barsukov's avatar
Vladimir Barsukov committed
221
		try := 0
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
222
223

		if pool == ConnModeSync {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
224
			q = d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
225
		} else {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
226
			q = d.async()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
227
228
		}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
229
	repeat:
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
230
231
		if err := f(q, dst); err != nil {
			if q.Mode == ConnModeMaster {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
232
233
				return err
			} else {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
234
235
236

				if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
					try++
Vladimir Barsukov's avatar
Vladimir Barsukov committed
237
238
239
					d.logger.Printf("DB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())

					time.Sleep(d.TryOnSleep)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
240
					goto repeat
Vladimir Barsukov's avatar
Vladimir Barsukov committed
241
242
243
				}

				if contains(err.Error(), d.Continues) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
244
					d.setNotAliveConn(q)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
245
					d.logger.Printf("DB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
246
247
248
249
250
251
252
253
254
255
256
					continue
				} else {
					return err
				}
			}
		}

		return nil
	}
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
257
func (d *Pool) ping(q *Conn) (err error) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
258
259
	var n any

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
260
	if err = d.qGet(q, &n, "SELECT 1"); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
261
262
		d.logger.Printf("DB_PING_ERR: SRV: %s; %v",
			q.ToString(),
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
263
264
265
266
267
268
269
			err,
		)
	}

	return
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
270
func (d *Pool) setNotAliveConn(conn *Conn) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
271
272
273
	d.mu.Lock()
	defer d.mu.Unlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
274
	for i, slave := range d.SrvSync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
275
276
277
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
278
			d.SrvSync = remove(d.SrvSync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
279
280
281
282
			return
		}
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
283
	for i, slave := range d.SrvAsync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
284
285
286
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
287
			d.SrvAsync = remove(d.SrvAsync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
288
289
290
291
292
293

			return
		}
	}
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
294
295
296
func (d *Pool) Start() {
	d.stop = false

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
297
298
	go func() {
		for {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
299
300
301
302
			if d.stop {
				return
			}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
303
			for i, q := range d.notAliveConns {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
304
				if err := d.ping(q); err == nil {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
305
					d.mu.Lock()
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
306
					q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
307
					d.notAliveConns = remove(d.notAliveConns, i)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
308
					if q.Mode == ConnModeSync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
309
						d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
310
					} else if q.Mode == ConnModeAsync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
311
						d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
312
313
314
315
316
					}
					d.mu.Unlock()
				}
			}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
317
318
			if d.SrvMaster != nil {
				d.SrvMaster.Alive = d.ping(d.SrvMaster) == nil
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
319
			}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
320

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
321
			time.Sleep(time.Second)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
322
323
324
325
		}
	}()
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
326
327
328
329
func (d *Pool) Stop() {
	d.stop = true
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
330
func (d *Pool) IsAlive() bool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
331
	return d.SrvMaster != nil && d.SrvMaster.Alive
Vladimir Barsukov's avatar
Vladimir Barsukov committed
332
333
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
func (d *Pool) prepare(sql string, param map[string]any) string {
	for n, t := range param {
		switch t.(type) {
		case time.Time:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime)))
		case *time.Time:
			if t.(*time.Time) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime)))
			}
		case nil:
			sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
		case bool:
			if t.(bool) {
				sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
			}
		case *string:
			if t.(*string) == nil || *t.(*string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*t.(*string), "'", "''")))
			}
		case string:
			if t.(string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''")))
			}
		case *int:
			if t.(*int) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int)))
			}
		case *bool:
			if t.(*bool) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				if *t.(*bool) {
					sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
				} else {
					sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
				}
			}
		case *int64:
			if t.(*int64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64)))
			}
		case *float64:
			if t.(*float64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64)))
			}
		case int, int64:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
		default:
			switch reflect.TypeOf(t).Kind() {
			case reflect.Slice:
				sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(t), " "), ","), "[]")+"}'")
			}

			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", t))
		}
	}

	if _, ok := param["_debug"]; ok {
		d.logger.Printf(sql)
	}

	return sql
}