zq.go 1.68 KB
Newer Older
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
1
2
3
package zq

import (
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
4
	"fmt"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
5
	"git.barsukov.pro/barsukov/zgo/zatomic"
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
6
	"git.barsukov.pro/barsukov/zgo/zquit"
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
7
	"runtime"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
8
9
10
11
)

type ZQ struct {
	WorkerCounter *zatomic.Int
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
12
	WorkerFn      func(int, any)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
13
	ZQuit         *zquit.ZQuit
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
14
	Jobs          chan any
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
15

Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
16
	workerID *zatomic.Int
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
17
18
19
	complete chan bool
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
20
func New(qtyWorkers int, buff int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
21
22
	q := &ZQ{
		WorkerCounter: zatomic.New(),
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
23
		WorkerFn:      fn,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
24
		ZQuit:         zquit.Default(),
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
25
		Jobs:          make(chan any, buff),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
26
		complete:      make(chan bool),
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
27
		workerID:      zatomic.New(),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
28
29
	}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
30
	q.AddWorker(qtyWorkers)
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
31
32
33

	return q
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
34
func Default(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
35
36
37
	return DefaultN(runtime.NumCPU(), fn)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
38
func DefaultN(qtyWorkers int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
39
	return New(qtyWorkers, qtyWorkers*2, fn)
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
40
}
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
41

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
42
43
func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ {
	q.ZQuit = quit
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
44

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
45
46
47
	return q
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
48
49
50
51
52
53
54
55
56
57
func (q *ZQ) AddWorker(delta int) {
	if delta < 1 {
		for i := 0; i < delta*-1; i++ {
			q.complete <- true
		}

		return
	}

	for i := 0; i < delta; i++ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
58
		q.WorkerCounter.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
59
60

		go func() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
61
			defer q.WorkerCounter.Dec()
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
62
			workerID := q.workerID.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
63
64
65
66
67

			for {
				select {
				case <-q.complete:
					return
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
68
				case j := <-q.Jobs:
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
69
					q.WorkerFn(workerID, j)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
70
					q.ZQuit.Done()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
71
72
73
74
75
76
77
78
79
80
				}
			}
		}()
	}
}

func (q *ZQ) RemoveWorker() {
	q.AddWorker(-1)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
81
82
83
84
85
86
func (q *ZQ) AddJob(a any) error {
	if q.ZQuit.IsQuit() {
		return fmt.Errorf("quit")
	}

	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
87
	q.Jobs <- a
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
88

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
89
	return nil
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
90
91
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
92
93
func (q *ZQ) AddJobForce(a any) error {
	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
94
	q.Jobs <- a
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
95
96
97
98

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
99
100
101
102
103
func (q *ZQ) MustJob(a any) {
	q.ZQuit.Inc()
	q.Jobs <- a
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
104
func (q *ZQ) Wait() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
105
	q.ZQuit.Wait()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
106
107
108
109
110
111
}

func (q *ZQ) Quit() {
	q.Wait()
	q.AddWorker(q.WorkerCounter.LoadInverse())
}