pool.go 8.73 KB
Newer Older
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package zdb

import (
	"context"
	"errors"
	"fmt"
	"github.com/jackc/pgx/v5/pgxpool"
	"log"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

Vladimir Barsukov's avatar
Vladimir Barsukov committed
16
17
18
19
20
21
22
type Balance int

const (
	BalanceRoundRobin Balance = iota
	BalanceLeastConn
)

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
23
type Pool struct {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
24
25
26
27
28
29
30
	ctx             context.Context
	logger          Logger
	SrvMaster       *Conn
	SrvSync         []*Conn
	SrvAsync        []*Conn
	mu              *sync.RWMutex
	notAliveConns   []*Conn
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
31
32
	slavesIter      *atomic.Int64
	slavesAsyncIter *atomic.Int64
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
33
34
35
36
37
38
39
	stop            bool
	PgTsFormat      string
	Continues       []string
	ContinuesTry    []string
	TryOnError      int
	TryOnSleep      time.Duration
	Balance         Balance
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
40
41
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
42
func New(ctx context.Context) *Pool {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
43
	return &Pool{
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
44
		ctx:             ctx,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
45
46
47
		mu:              &sync.RWMutex{},
		slavesIter:      &atomic.Int64{},
		slavesAsyncIter: &atomic.Int64{},
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
48
		PgTsFormat:      "2006-01-02 15:04:05",
Vladimir Barsukov's avatar
Vladimir Barsukov committed
49
50
51
		Continues:       []string{"connect", "EOF", "conflict with recovery"},
		ContinuesTry:    []string{"conflict with recovery"},
		TryOnError:      1,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
52
		TryOnSleep:      time.Second,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
53
		Balance:         BalanceLeastConn,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
54
55
56
	}
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
57
func NewDefault() *Pool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
58
	p := New(context.Background())
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
59
60
61
	p.logger = log.Default()

	return p
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
62
63
64
65
66
}

func (d *Pool) WithContext(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
67
68
69
70
		logger:          d.logger,
		SrvMaster:       d.SrvMaster,
		SrvSync:         d.SrvSync,
		SrvAsync:        d.SrvAsync,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
71
72
73
74
		mu:              d.mu,
		notAliveConns:   d.notAliveConns,
		slavesIter:      d.slavesIter,
		slavesAsyncIter: d.slavesAsyncIter,
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
75
76
77
78
79
80
81
		stop:            d.stop,
		PgTsFormat:      d.PgTsFormat,
		Continues:       d.Continues,
		ContinuesTry:    d.ContinuesTry,
		TryOnError:      d.TryOnError,
		TryOnSleep:      d.TryOnSleep,
		Balance:         d.Balance,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
82
83
84
	}
}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
85
func (d *Pool) WithTimeout(dur time.Duration) *Pool {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
86
	ctx, _ := context.WithTimeout(d.ctx, dur)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
87
88
89
90
91

	return d.WithContext(ctx)
}

func (d *Pool) WithDeadline(dur time.Time) *Pool {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
92
	ctx, _ := context.WithDeadline(d.ctx, dur)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
93
94
95
96

	return d.WithContext(ctx)
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
97
func (d *Pool) NewConn(mode connMode, pgConnString string) error {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
98
99
	d.mu.Lock()
	defer d.mu.Unlock()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
100

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
101
102
103
104
	q, err := d.newConn(mode, pgConnString)

	switch mode {
	case ConnModeMaster:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
105
		d.SrvMaster = q
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
106
	case ConnModeSync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
107
		d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
108
	case ConnModeAsync:
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
109
		d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
110
111
	default:
		return errors.New("unknown mode")
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
112
	}
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
113
114

	return err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
115
}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
116
117
118
119
120
121
122
123
124
125
126

func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
	for _, s := range pgConnString {
		if err := d.NewConn(mode, s); err != nil {
			return err
		}
	}

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
127
func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
128
	var pgxPool *pgxpool.Pool
Vladimir Barsukov's avatar
Vladimir Barsukov committed
129
	var pgxConfig *pgxpool.Config
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
130
131
132
133
134

	if !strings.Contains(pgConnString, "default_query_exec_mode=") {
		pgConnString += " default_query_exec_mode=simple_protocol"
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
135
136
137
138
	if !strings.Contains(pgConnString, "connect_timeout=") {
		pgConnString += " connect_timeout=3"
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
139
140
141
142
	if !strings.Contains(pgConnString, "sslmode=") {
		pgConnString += " sslmode=disable"
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
143
144
145
146
147
	if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
		return nil, err
	}

	if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
148
		return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
149
150
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
151
	q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
152

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
153
	if err = d.ping(q); err != nil {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
154
		return q, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
155
156
	}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
157
	q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
158

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
159
	return q, nil
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
160
161
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
162
func (d *Pool) sync() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
163
164
	if len(d.SrvSync) == 0 {
		return d.SrvMaster
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
165
166
167
168
169
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
170
171
	if len(d.SrvSync) == 1 {
		return d.SrvSync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
172
173
174
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
175
		return d.SrvSync[d.slavesIter.Add(1)%int64(len(d.SrvSync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
176
177
178
179
180
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
181
	for _, conn := range d.SrvSync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
182
183
184
185
186
187
188
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
189
190
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
191
func (d *Pool) async() *Conn {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
192
	if len(d.SrvAsync) == 0 {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
193
		return d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
194
195
196
197
198
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
199
200
	if len(d.SrvAsync) == 1 {
		return d.SrvAsync[0]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
201
202
203
	}

	if d.Balance == BalanceRoundRobin {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
204
		return d.SrvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.SrvAsync))]
Vladimir Barsukov's avatar
Vladimir Barsukov committed
205
206
207
208
209
	}

	var out *Conn
	var min int32 = 1 << 30

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
210
	for _, conn := range d.SrvAsync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
211
212
213
214
215
216
217
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
218
219
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
220
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
221
	for {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
222
		var q *Conn
Vladimir Barsukov's avatar
Vladimir Barsukov committed
223
		try := 0
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
224
225

		if pool == ConnModeSync {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
226
			q = d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
227
		} else {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
228
			q = d.async()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
229
230
		}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
231
	repeat:
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
232
233
		if err := f(q, dst); err != nil {
			if q.Mode == ConnModeMaster {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
234
235
				return err
			} else {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
236
237
238

				if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
					try++
Vladimir Barsukov's avatar
Vladimir Barsukov committed
239
240
241
					d.logger.Printf("DB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())

					time.Sleep(d.TryOnSleep)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
242
					goto repeat
Vladimir Barsukov's avatar
Vladimir Barsukov committed
243
244
245
				}

				if contains(err.Error(), d.Continues) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
246
					d.setNotAliveConn(q)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
247
					d.logger.Printf("DB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
248
249
250
251
252
253
254
255
256
257
258
					continue
				} else {
					return err
				}
			}
		}

		return nil
	}
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
259
func (d *Pool) ping(q *Conn) (err error) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
260
261
	var n any

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
262
	if err = d.qGet(q, &n, "SELECT 1"); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
263
264
		d.logger.Printf("DB_PING_ERR: SRV: %s; %v",
			q.ToString(),
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
265
266
267
268
269
270
271
			err,
		)
	}

	return
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
272
func (d *Pool) setNotAliveConn(conn *Conn) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
273
274
275
	d.mu.Lock()
	defer d.mu.Unlock()

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
276
	for i, slave := range d.SrvSync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
277
278
279
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
280
			d.SrvSync = remove(d.SrvSync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
281
282
283
284
			return
		}
	}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
285
	for i, slave := range d.SrvAsync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
286
287
288
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
289
			d.SrvAsync = remove(d.SrvAsync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
290
291
292
293
294
295

			return
		}
	}
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
296
297
298
func (d *Pool) Start() {
	d.stop = false

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
299
300
	go func() {
		for {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
301
302
303
304
			if d.stop {
				return
			}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
305
			for i, q := range d.notAliveConns {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
306
				if err := d.ping(q); err == nil {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
307
					d.mu.Lock()
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
308
					q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
309
					d.notAliveConns = remove(d.notAliveConns, i)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
310
					if q.Mode == ConnModeSync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
311
						d.SrvSync = append(d.SrvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
312
					} else if q.Mode == ConnModeAsync {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
313
						d.SrvAsync = append(d.SrvAsync, q)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
314
315
316
317
318
					}
					d.mu.Unlock()
				}
			}

Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
319
320
			if d.SrvMaster != nil {
				d.SrvMaster.Alive = d.ping(d.SrvMaster) == nil
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
321
			}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
322

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
323
			time.Sleep(time.Second)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
324
325
326
327
		}
	}()
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
328
329
330
331
func (d *Pool) Stop() {
	d.stop = true
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
332
func (d *Pool) IsAlive() bool {
Vladimir Barsukov's avatar
timeout    
Vladimir Barsukov committed
333
	return d.SrvMaster != nil && d.SrvMaster.Alive
Vladimir Barsukov's avatar
Vladimir Barsukov committed
334
335
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
func (d *Pool) prepare(sql string, param map[string]any) string {
	for n, t := range param {
		switch t.(type) {
		case time.Time:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime)))
		case *time.Time:
			if t.(*time.Time) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime)))
			}
		case nil:
			sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
		case bool:
			if t.(bool) {
				sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
			}
		case *string:
			if t.(*string) == nil || *t.(*string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*t.(*string), "'", "''")))
			}
		case string:
			if t.(string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''")))
			}
		case *int:
			if t.(*int) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int)))
			}
		case *bool:
			if t.(*bool) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				if *t.(*bool) {
					sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
				} else {
					sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
				}
			}
		case *int64:
			if t.(*int64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64)))
			}
		case *float64:
			if t.(*float64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64)))
			}
		case int, int64:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
		default:
			switch reflect.TypeOf(t).Kind() {
			case reflect.Slice:
				sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(t), " "), ","), "[]")+"}'")
			}

			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", t))
		}
	}

	if _, ok := param["_debug"]; ok {
		d.logger.Printf(sql)
	}

	return sql
}