zq.go 2.33 KB
Newer Older
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
1
2
3
package zq

import (
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
4
	"fmt"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
5
	"git.barsukov.pro/barsukov/zgo/zatomic"
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
6
	"git.barsukov.pro/barsukov/zgo/zquit"
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
7
	"runtime"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
8
9
10
11
)

type ZQ struct {
	WorkerCounter *zatomic.Int
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
12
	WorkerFn      func(int, any)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
13
	ZQuit         *zquit.ZQuit
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
14
	Jobs          chan any
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
15

Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
16
	workerID *zatomic.Int
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
17
18
19
	complete chan bool
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
20
func New(qtyWorkers int, buff int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
21
22
	q := &ZQ{
		WorkerCounter: zatomic.New(),
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
23
		WorkerFn:      fn,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
24
		ZQuit:         zquit.Default(),
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
25
		Jobs:          make(chan any, buff),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
26
		complete:      make(chan bool),
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
27
		workerID:      zatomic.New(),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
28
29
	}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
30
	q.AddWorker(qtyWorkers)
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
31
32
33

	return q
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
34
func Default(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
35
36
37
	return DefaultN(runtime.NumCPU(), fn)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
38
func DefaultN(qtyWorkers int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
39
	return New(qtyWorkers, qtyWorkers*2, fn)
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
40
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
41
func Default2(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
42
43
	return DefaultN(2, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
44
func Default4(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
45
46
	return DefaultN(4, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
47
func Default6(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
48
49
	return DefaultN(6, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
50
func Default8(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
51
52
	return DefaultN(8, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
53
func Default12(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
54
55
	return DefaultN(12, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
56
func Default16(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
57
58
	return DefaultN(16, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
59
func Default24(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
60
61
	return DefaultN(24, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
62
func Default32(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
63
64
	return DefaultN(32, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
65
func Default48(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
66
67
	return DefaultN(48, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
68
func Default64(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
69
70
	return DefaultN(64, fn)
}
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
71

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
72
73
func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ {
	q.ZQuit = quit
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
74

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
75
76
77
	return q
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
78
79
80
81
82
83
84
85
86
87
func (q *ZQ) AddWorker(delta int) {
	if delta < 1 {
		for i := 0; i < delta*-1; i++ {
			q.complete <- true
		}

		return
	}

	for i := 0; i < delta; i++ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
88
		q.WorkerCounter.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
89
90

		go func() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
91
			defer q.WorkerCounter.Dec()
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
92
			workerID := q.workerID.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
93
94
95
96
97

			for {
				select {
				case <-q.complete:
					return
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
98
				case j := <-q.Jobs:
Vladimir Barsukov's avatar
lint    
Vladimir Barsukov committed
99
					q.WorkerFn(workerID, j)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
100
					q.ZQuit.Done()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
101
102
103
104
105
106
107
108
109
110
				}
			}
		}()
	}
}

func (q *ZQ) RemoveWorker() {
	q.AddWorker(-1)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
111
112
113
114
115
116
func (q *ZQ) AddJob(a any) error {
	if q.ZQuit.IsQuit() {
		return fmt.Errorf("quit")
	}

	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
117
	q.Jobs <- a
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
118

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
119
	return nil
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
120
121
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
122
123
func (q *ZQ) AddJobForce(a any) error {
	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
124
	q.Jobs <- a
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
125
126
127
128

	return nil
}

Vladimir Barsukov's avatar
Vladimir Barsukov committed
129
130
131
132
133
func (q *ZQ) MustJob(a any) {
	q.ZQuit.Inc()
	q.Jobs <- a
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
134
func (q *ZQ) Wait() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
135
	q.ZQuit.Wait()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
136
137
138
139
140
141
}

func (q *ZQ) Quit() {
	q.Wait()
	q.AddWorker(q.WorkerCounter.LoadInverse())
}