pool.go 7.94 KB
Newer Older
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package zdb

import (
	"context"
	"errors"
	"fmt"
	"github.com/jackc/pgx/v5/pgxpool"
	"log"
	"reflect"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

Vladimir Barsukov's avatar
Vladimir Barsukov committed
16
17
18
19
20
21
22
type Balance int

const (
	BalanceRoundRobin Balance = iota
	BalanceLeastConn
)

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
23
24
25
26
type Pool struct {
	ctx    context.Context
	logger Logger

Vladimir Barsukov's avatar
Vladimir Barsukov committed
27
28
29
	srvMaster *Conn
	srvSync   []*Conn
	srvAsync  []*Conn
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
30
31
32

	mu *sync.RWMutex

Vladimir Barsukov's avatar
Vladimir Barsukov committed
33
	notAliveConns []*Conn
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
34
35
36

	slavesIter      *atomic.Int64
	slavesAsyncIter *atomic.Int64
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
37
38

	stop bool
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
39
40

	PgTsFormat string
Vladimir Barsukov's avatar
Vladimir Barsukov committed
41
42
43
44

	Continues    []string
	ContinuesTry []string
	TryOnError   int
Vladimir Barsukov's avatar
Vladimir Barsukov committed
45
	TryOnSleep   time.Duration
Vladimir Barsukov's avatar
Vladimir Barsukov committed
46
	Balance      Balance
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
47
48
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
49
50
51
52
53
func New() *Pool {
	return &Pool{
		mu:              &sync.RWMutex{},
		slavesIter:      &atomic.Int64{},
		slavesAsyncIter: &atomic.Int64{},
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
54
		PgTsFormat:      "2006-01-02 15:04:05",
Vladimir Barsukov's avatar
Vladimir Barsukov committed
55
56
57
		Continues:       []string{"connect", "EOF", "conflict with recovery"},
		ContinuesTry:    []string{"conflict with recovery"},
		TryOnError:      1,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
58
		TryOnSleep:      time.Second,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
59
		Balance:         BalanceLeastConn,
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
60
61
62
	}
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
63
func NewDefault() *Pool {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
64
65
66
67
68
	p := New()
	p.ctx = context.Background()
	p.logger = log.Default()

	return p
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
69
70
71
72
73
74
}

func (d *Pool) WithContext(ctx context.Context) *Pool {
	return &Pool{
		ctx:             ctx,
		srvMaster:       d.srvMaster,
Vladimir Barsukov's avatar
Vladimir Barsukov committed
75
76
		srvSync:         d.srvSync,
		srvAsync:        d.srvAsync,
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
77
78
79
80
81
82
83
84
		mu:              d.mu,
		notAliveConns:   d.notAliveConns,
		slavesIter:      d.slavesIter,
		slavesAsyncIter: d.slavesAsyncIter,
	}
}

func (d *Pool) NewConn(mode connMode, pgConnString string) error {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
85
86
	d.mu.Lock()
	defer d.mu.Unlock()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
87

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
88
89
90
91
92
93
	q, err := d.newConn(mode, pgConnString)

	switch mode {
	case ConnModeMaster:
		d.srvMaster = q
	case ConnModeSync:
Vladimir Barsukov's avatar
Vladimir Barsukov committed
94
		d.srvSync = append(d.srvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
95
	case ConnModeAsync:
Vladimir Barsukov's avatar
Vladimir Barsukov committed
96
		d.srvAsync = append(d.srvAsync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
97
98
	default:
		return errors.New("unknown mode")
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
99
	}
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
100
101

	return err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
102
}
Vladimir Barsukov's avatar
Vladimir Barsukov committed
103
104
105
106
107
108
109
110
111
112
113

func (d *Pool) NewConns(mode connMode, pgConnString ...string) error {
	for _, s := range pgConnString {
		if err := d.NewConn(mode, s); err != nil {
			return err
		}
	}

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
114
func (d *Pool) newConn(mode connMode, pgConnString string) (q *Conn, err error) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
115
	var pgxPool *pgxpool.Pool
Vladimir Barsukov's avatar
Vladimir Barsukov committed
116
	var pgxConfig *pgxpool.Config
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
117
118
119
120
121

	if !strings.Contains(pgConnString, "default_query_exec_mode=") {
		pgConnString += " default_query_exec_mode=simple_protocol"
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
122
123
124
125
126
	if pgxConfig, err = pgxpool.ParseConfig(pgConnString); err != nil {
		return nil, err
	}

	if pgxPool, err = pgxpool.NewWithConfig(d.ctx, pgxConfig); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
127
		return &Conn{Pool: pgxPool, Alive: false, Mode: mode}, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
128
129
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
130
	q = &Conn{Pool: pgxPool, Alive: false, Mode: mode}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
131

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
132
	if err = d.ping(q); err != nil {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
133
		return q, err
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
134
135
	}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
136
	q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
137

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
138
	return q, nil
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
139
140
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
141
142
func (d *Pool) sync() *Conn {
	if len(d.srvSync) == 0 {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
143
144
145
146
147
148
		return d.srvMaster
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
Vladimir Barsukov committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
	if len(d.srvSync) == 1 {
		return d.srvSync[0]
	}

	if d.Balance == BalanceRoundRobin {
		return d.srvSync[d.slavesIter.Add(1)%int64(len(d.srvSync))]
	}

	var out *Conn
	var min int32 = 1 << 30

	for _, conn := range d.srvSync {
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
168
169
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
170
171
func (d *Pool) async() *Conn {
	if len(d.srvAsync) == 0 {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
172
		return d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
173
174
175
176
177
	}

	d.mu.RLock()
	defer d.mu.RUnlock()

Vladimir Barsukov's avatar
Vladimir Barsukov committed
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
	if len(d.srvAsync) == 1 {
		return d.srvAsync[0]
	}

	if d.Balance == BalanceRoundRobin {
		return d.srvAsync[d.slavesAsyncIter.Add(1)%int64(len(d.srvAsync))]
	}

	var out *Conn
	var min int32 = 1 << 30

	for _, conn := range d.srvAsync {
		if conn.Stat().AcquiredConns() < min {
			min = conn.Stat().AcquiredConns()
			out = conn
		}
	}

	return out
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
197
198
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
199
func (d *Pool) execWrapper(pool connMode, dst any, f func(conn *Conn, dst1 any) error) error {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
200
	for {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
201
		var q *Conn
Vladimir Barsukov's avatar
Vladimir Barsukov committed
202
		try := 0
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
203
204

		if pool == ConnModeSync {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
205
			q = d.sync()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
206
		} else {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
207
			q = d.async()
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
208
209
		}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
210
	repeat:
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
211
212
		if err := f(q, dst); err != nil {
			if q.Mode == ConnModeMaster {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
213
214
				return err
			} else {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
215
216
217

				if try < d.TryOnError && contains(err.Error(), d.ContinuesTry) {
					try++
Vladimir Barsukov's avatar
Vladimir Barsukov committed
218
219
220
					d.logger.Printf("DB_EXEC_WRAPPER_REPEAT_ERR: SRV: %s TRY: %d; %s", q.ToString(), try, err.Error())

					time.Sleep(d.TryOnSleep)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
221
					goto repeat
Vladimir Barsukov's avatar
Vladimir Barsukov committed
222
223
224
				}

				if contains(err.Error(), d.Continues) {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
225
					d.setNotAliveConn(q)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
226
					d.logger.Printf("DB_EXEC_WRAPPER_ERR: SRV: %s; %s", q.ToString(), err.Error())
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
227
228
229
230
231
232
233
234
235
236
237
					continue
				} else {
					return err
				}
			}
		}

		return nil
	}
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
238
func (d *Pool) ping(q *Conn) (err error) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
239
240
	var n any

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
241
	if err = d.qGet(q, &n, "SELECT 1"); err != nil {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
242
243
		d.logger.Printf("DB_PING_ERR: SRV: %s; %v",
			q.ToString(),
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
244
245
246
247
248
249
250
			err,
		)
	}

	return
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
251
func (d *Pool) setNotAliveConn(conn *Conn) {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
252
253
254
	d.mu.Lock()
	defer d.mu.Unlock()

Vladimir Barsukov's avatar
Vladimir Barsukov committed
255
	for i, slave := range d.srvSync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
256
257
258
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
259
			d.srvSync = remove(d.srvSync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
260
261
262
263
			return
		}
	}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
264
	for i, slave := range d.srvAsync {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
265
266
267
		if slave == conn {
			conn.Alive = false
			d.notAliveConns = append(d.notAliveConns, conn)
Vladimir Barsukov's avatar
Vladimir Barsukov committed
268
			d.srvAsync = remove(d.srvAsync, i)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
269
270
271
272
273
274

			return
		}
	}
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
275
276
277
func (d *Pool) Start() {
	d.stop = false

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
278
279
	go func() {
		for {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
280
281
282
283
			if d.stop {
				return
			}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
284
			for i, q := range d.notAliveConns {
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
285
				if err := d.ping(q); err == nil {
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
286
					d.mu.Lock()
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
287
					q.Alive = true
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
288
					d.notAliveConns = remove(d.notAliveConns, i)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
289
					if q.Mode == ConnModeSync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
290
						d.srvSync = append(d.srvSync, q)
Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
291
					} else if q.Mode == ConnModeAsync {
Vladimir Barsukov's avatar
Vladimir Barsukov committed
292
						d.srvAsync = append(d.srvAsync, q)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
293
294
295
296
297
					}
					d.mu.Unlock()
				}
			}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
298
299
300
			if d.srvMaster != nil {
				d.srvMaster.Alive = d.ping(d.srvMaster) == nil
			}
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
301

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
302
			time.Sleep(time.Second)
Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
303
304
305
306
		}
	}()
}

Vladimir Barsukov's avatar
save    
Vladimir Barsukov committed
307
308
309
310
func (d *Pool) Stop() {
	d.stop = true
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
311
func (d *Pool) IsAlive() bool {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
312
	return d.srvMaster != nil && d.srvMaster.Alive
Vladimir Barsukov's avatar
Vladimir Barsukov committed
313
314
}

Vladimir Barsukov's avatar
init  
Vladimir Barsukov committed
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
func (d *Pool) prepare(sql string, param map[string]any) string {
	for n, t := range param {
		switch t.(type) {
		case time.Time:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime)))
		case *time.Time:
			if t.(*time.Time) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime)))
			}
		case nil:
			sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
		case bool:
			if t.(bool) {
				sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
			}
		case *string:
			if t.(*string) == nil || *t.(*string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(*t.(*string), "'", "''")))
			}
		case string:
			if t.(string) == "NULL" {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''")))
			}
		case *int:
			if t.(*int) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int)))
			}
		case *bool:
			if t.(*bool) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				if *t.(*bool) {
					sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
				} else {
					sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
				}
			}
		case *int64:
			if t.(*int64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64)))
			}
		case *float64:
			if t.(*float64) == nil {
				sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
			} else {
				sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64)))
			}
		case int, int64:
			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
		default:
			switch reflect.TypeOf(t).Kind() {
			case reflect.Slice:
				sql = strings.ReplaceAll(sql, ":"+n+":", "'{"+strings.Trim(strings.Join(strings.Split(fmt.Sprint(t), " "), ","), "[]")+"}'")
			}

			sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", t))
		}
	}

	if _, ok := param["_debug"]; ok {
		d.logger.Printf(sql)
	}

	return sql
}