pool.go 8.64 KB
Newer Older
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package zdb

import (
	"context"
	"errors"
	"fmt"
	"github.com/jackc/pgx/v5/pgxpool"
	"log"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

Vladimir Barsukov's avatar
Vladimir Barsukov committed
16
17
18
19
20
21
22
type Balance int

const (
	BalanceRoundRobin Balance = iota
	BalanceLeastConn
)

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
23
type Pool struct {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
24
25
26
27
28
29
30
	ctx             context.Context
	logger          Logger
	SrvMaster       *Conn
	SrvSync         []*Conn
	SrvAsync        []*Conn
	mu              *sync.RWMutex
	notAliveConns   []*Conn
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
31
32
	slavesIter      *atomic.Int64
	slavesAsyncIter *atomic.Int64
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
33
34
35
36
37
38
39
	stop            bool
	PgTsFormat      string
	Continues       []string
	ContinuesTry    []string
	TryOnError      int
	TryOnSleep      time.Duration
	Balance         Balance
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
40
41
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
42
func New(ctx context.Context) *Pool {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
43
	return &Pool{
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
44
		ctx:             ctx,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
45
46
47
		mu:              &sync.RWMutex{},
		slavesIter:      &atomic.Int64{},
		slavesAsyncIter: &atomic.Int64{},
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
48
		PgTsFormat:      "2006-01-02 15:04:05",
Vladimir Barsukov's avatar
Vladimir Barsukov committed
49
50
51
		Continues:       []string{"connect", "EOF", "conflict with recovery"},
		ContinuesTry:    []string{"conflict with recovery"},
		TryOnError:      1,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
52
		TryOnSleep:      time.Second,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
53
		Balance:         BalanceLeastConn,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
54
55
56
	}
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
57
func NewDefault() *Pool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
58
	p := New(context.Background())
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
59
60
61
	p.logger = log.Default()

	return p
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
62
63
64
65
66
}

func (d *Pool) WithContext(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
67
68
69
70
		logger:          d.logger,
		SrvMaster:       d.SrvMaster,
		SrvSync:         d.SrvSync,
		SrvAsync:        d.SrvAsync,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
71
72
73
74
		mu:              d.mu,
		notAliveConns:   d.notAliveConns,
		slavesIter:      d.slavesIter,
		slavesAsyncIter: d.slavesAsyncIter,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
75
76
77
78
79
80
81
		stop:            d.stop,
		PgTsFormat:      d.PgTsFormat,
		Continues:       d.Continues,
		ContinuesTry:    d.ContinuesTry,
		TryOnError:      d.TryOnError,
		TryOnSleep:      d.TryOnSleep,
		Balance:         d.Balance,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
82
83
84
	}
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
85
func (d *Pool) WithTimeout(dur time.Duration) *Pool {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
86
	ctx, _ := context.WithTimeout(d.ctx, dur)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
87
88
89
90
91

	return d.WithContext(ctx)
}

func (d *Pool) WithDeadline(dur time.Time) *Pool {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
92
	ctx, _ := context.WithDeadline(d.ctx, dur)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
93
94
95
96

	return d.WithContext(ctx)
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
97
func (d *Pool) NewConn(mode connMode, pgConnString string) error {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
98
99
	d.mu.Lock()
	defer d.mu.Unlock()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
100

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
101
102
103
104
	q, err := d.newConn(mode, pgConnString)

	switch mode {
	case ConnModeMaster:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
105
		d.SrvMaster = q
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
106
	case ConnModeSync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
107
		d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
108
	case ConnModeAsync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
109
		d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
110
111
	default:
		return errors.New("unknown mode")
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
112
	}
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
113
114

	return err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
115
}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
116
117
118
119
120
121
122
123
124
125
126

func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
	for _, s := range pgConnString {
		if err := d.NewConn(mode, s); err != nil {
			return err
		}
	}

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
127
func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
128
	var pgxPool *pgxpool.Pool
Vladimir Barsukov's avatar
Vladimir Barsukov committed
129
	var pgxConfig *pgxpool.Config
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
130
131
132
133
134

	if !strings.Contains(pgConnString, "default_query_exec_mode=") {
		pgConnString += " default_query_exec_mode=simple_protocol"
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
135
136
137
138
	if !strings.Contains(pgConnString, "connect_timeout=") {
		pgConnString += " connect_timeout=3"
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
139
140
141
142
143
	if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
		return nil, err
	}

	if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
144
		return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
145
146
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
147
	q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
148

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
149
	if err = d.ping(q); err != nil {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
150
		return q, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
151
152
	}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
153
	q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
154

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
155
	return q, nil
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
156
157
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
158
func (d *Pool) sync() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
159
160
	if len(d.SrvSync) == 0 {
		return d.SrvMaster
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
161
162
163
164
165
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
166
167
	if len(d.SrvSync) == 1 {
		return d.SrvSync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
168
169
170
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
171
		return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
172
173
174
175
176
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
177
	for _, conn := range d.SrvSync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
178
179
180
181
182
183
184
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
185
186
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
187
func (d *Pool) async() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
188
	if len(d.SrvAsync) == 0 {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
189
		return d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
190
191
192
193
194
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
195
196
	if len(d.SrvAsync) == 1 {
		return d.SrvAsync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
197
198
199
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
200
		return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
201
202
203
204
205
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
206
	for _, conn := range d.SrvAsync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
207
208
209
210
211
212
213
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
214
215
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
216
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
217
	for {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
218
		var q *Conn
Vladimir Barsukov's avatar
Vladimir Barsukov committed
219
		try := 0
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
220
221

		if pool == ConnModeSync {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
222
			q = d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
223
		} else {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
224
			q = d.async()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
225
226
		}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
227
	repeat:
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
228
229
		if err := f(q, dst); err != nil {
			if q.Mode == ConnModeMaster {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
230
231
				return err
			} else {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
232
233
234

				if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
					try++
Vladimir Barsukov's avatar
Vladimir Barsukov committed
235
236
237
					d.logger.Printf("DB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())

					time.Sleep(d.TryOnSleep)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
238
					goto repeat
Vladimir Barsukov's avatar
Vladimir Barsukov committed
239
240
241
				}

				if contains(err.Error(), d.Continues) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
242
					d.setNotAliveConn(q)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
243
					d.logger.Printf("DB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
244
245
246
247
248
249
250
251
252
253
254
					continue
				} else {
					return err
				}
			}
		}

		return nil
	}
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
255
func (d *Pool) ping(q *Conn) (err error) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
256
257
	var n any

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
258
	if err = d.qGet(q, &n, "SELECT 1"); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
259
260
		d.logger.Printf("DB_PING_ERR: SRV: %s; %v",
			q.ToString(),
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
261
262
263
264
265
266
267
			err,
		)
	}

	return
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
268
func (d *Pool) setNotAliveConn(conn *Conn) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
269
270
271
	d.mu.Lock()
	defer d.mu.Unlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
272
	for i, slave := range d.SrvSync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
273
274
275
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
276
			d.SrvSync = remove(d.SrvSync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
277
278
279
280
			return
		}
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
281
	for i, slave := range d.SrvAsync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
282
283
284
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
285
			d.SrvAsync = remove(d.SrvAsync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
286
287
288
289
290
291

			return
		}
	}
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
292
293
294
func (d *Pool) Start() {
	d.stop = false

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
295
296
	go func() {
		for {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
297
298
299
300
			if d.stop {
				return
			}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
301
			for i, q := range d.notAliveConns {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
302
				if err := d.ping(q); err == nil {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
303
					d.mu.Lock()
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
304
					q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
305
					d.notAliveConns = remove(d.notAliveConns, i)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
306
					if q.Mode == ConnModeSync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
307
						d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
308
					} else if q.Mode == ConnModeAsync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
309
						d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
310
311
312
313
314
					}
					d.mu.Unlock()
				}
			}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
315
316
			if d.SrvMaster != nil {
				d.SrvMaster.Alive = d.ping(d.SrvMaster) == nil
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
317
			}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
318

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
319
			time.Sleep(time.Second)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
320
321
322
323
		}
	}()
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
324
325
326
327
func (d *Pool) Stop() {
	d.stop = true
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
328
func (d *Pool) IsAlive() bool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
329
	return d.SrvMaster != nil && d.SrvMaster.Alive
Vladimir Barsukov's avatar
Vladimir Barsukov committed
330
331
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
func (d *Pool) prepare(sql string, param map[string]any) string {
	for n, t := range param {
		switch t.(type) {
		case time.Time:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime)))
		case *time.Time:
			if t.(*time.Time) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime)))
			}
		case nil:
			sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
		case bool:
			if t.(bool) {
				sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
			}
		case *string:
			if t.(*string) == nil || *t.(*string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*t.(*string), "'", "''")))
			}
		case string:
			if t.(string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''")))
			}
		case *int:
			if t.(*int) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int)))
			}
		case *bool:
			if t.(*bool) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				if *t.(*bool) {
					sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
				} else {
					sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
				}
			}
		case *int64:
			if t.(*int64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64)))
			}
		case *float64:
			if t.(*float64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64)))
			}
		case int, int64:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
		default:
			switch reflect.TypeOf(t).Kind() {
			case reflect.Slice:
				sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(t), " "), ","), "[]")+"}'")
			}

			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", t))
		}
	}

	if _, ok := param["_debug"]; ok {
		d.logger.Printf(sql)
	}

	return sql
}