zq.go 2.26 KB
Newer Older
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
1
2
3
package zq

import (
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
4
	"fmt"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
5
	"git.barsukov.pro/barsukov/zgo/zatomic"
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
6
	"git.barsukov.pro/barsukov/zgo/zquit"
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
7
	"runtime"
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
8
9
10
11
)

type ZQ struct {
	WorkerCounter *zatomic.Int
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
12
	WorkerFn      func(int, any)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
13
	ZQuit         *zquit.ZQuit
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
14
	Jobs          chan any
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
15

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
16
	workerId *zatomic.Int
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
17
18
19
	complete chan bool
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
20
func New(qtyWorkers int, buff int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
21
22
	q := &ZQ{
		WorkerCounter: zatomic.New(),
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
23
		WorkerFn:      fn,
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
24
		ZQuit:         zquit.Default(),
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
25
		Jobs:          make(chan any, buff),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
26
		complete:      make(chan bool),
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
27
		workerId:      zatomic.New(),
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
28
29
	}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
30
	q.AddWorker(qtyWorkers)
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
31
32
33

	return q
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
34
func Default(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
35
36
37
	return DefaultN(runtime.NumCPU(), fn)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
38
func DefaultN(qtyWorkers int, fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
39
	return New(qtyWorkers, qtyWorkers*2, fn)
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
40
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
41
func Default2(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
42
43
	return DefaultN(2, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
44
func Default4(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
45
46
	return DefaultN(4, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
47
func Default6(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
48
49
	return DefaultN(6, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
50
func Default8(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
51
52
	return DefaultN(8, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
53
func Default12(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
54
55
	return DefaultN(12, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
56
func Default16(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
57
58
	return DefaultN(16, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
59
func Default24(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
60
61
	return DefaultN(24, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
62
func Default32(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
63
64
	return DefaultN(32, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
65
func Default48(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
66
67
	return DefaultN(48, fn)
}
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
68
func Default64(fn func(int, any)) *ZQ {
Vladimir Barsukov's avatar
zq buff    
Vladimir Barsukov committed
69
70
	return DefaultN(64, fn)
}
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
71

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
72
73
74
75
76
func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ {
	q.ZQuit = quit
	return q
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
77
78
79
80
81
82
83
84
85
86
func (q *ZQ) AddWorker(delta int) {
	if delta < 1 {
		for i := 0; i < delta*-1; i++ {
			q.complete <- true
		}

		return
	}

	for i := 0; i < delta; i++ {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
87
		q.WorkerCounter.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
88
89

		go func() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
90
91
			defer q.WorkerCounter.Dec()
			workerId := q.workerId.Inc()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
92
93
94
95
96

			for {
				select {
				case <-q.complete:
					return
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
97
				case j := <-q.Jobs:
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
98
					q.WorkerFn(workerId, j)
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
99
					q.ZQuit.Done()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
100
101
102
103
104
105
106
107
108
109
				}
			}
		}()
	}
}

func (q *ZQ) RemoveWorker() {
	q.AddWorker(-1)
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
110
111
112
113
114
115
func (q *ZQ) AddJob(a any) error {
	if q.ZQuit.IsQuit() {
		return fmt.Errorf("quit")
	}

	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
116
	q.Jobs <- a
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
117

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
118
	return nil
Vladimir Barsukov's avatar
zgo    
Vladimir Barsukov committed
119
120
}

Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
121
122
func (q *ZQ) AddJobForce(a any) error {
	q.ZQuit.Inc()
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
123
	q.Jobs <- a
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
124
125
126
127

	return nil
}

Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
128
func (q *ZQ) Wait() {
Vladimir Barsukov's avatar
fix    
Vladimir Barsukov committed
129
	q.ZQuit.Wait()
Vladimir Barsukov's avatar
zgo  
Vladimir Barsukov committed
130
131
132
133
134
135
}

func (q *ZQ) Quit() {
	q.Wait()
	q.AddWorker(q.WorkerCounter.LoadInverse())
}