package zq import ( "fmt" "git.barsukov.pro/barsukov/zgo/zatomic" "git.barsukov.pro/barsukov/zgo/zquit" "runtime" ) type ZQ struct { WorkerCounter *zatomic.Int WorkerFn func(any) ZQuit *zquit.ZQuit complete chan bool jobs chan any } func New(qtyWorkers int, buff int, fn func(any)) *ZQ { q := &ZQ{ WorkerCounter: zatomic.New(), WorkerFn: fn, ZQuit: zquit.Default(), jobs: make(chan any, buff), complete: make(chan bool), } for i := 0; i < qtyWorkers; i++ { q.AddWorker(1) } return q } func Default(fn func(any)) *ZQ { return DefaultN(runtime.NumCPU(), fn) } func DefaultN(qtyWorkers int, fn func(any)) *ZQ { return New(qtyWorkers, 0, fn) } func Default2(fn func(any)) *ZQ { return DefaultN(2, fn) } func Default4(fn func(any)) *ZQ { return DefaultN(4, fn) } func Default6(fn func(any)) *ZQ { return DefaultN(6, fn) } func Default8(fn func(any)) *ZQ { return DefaultN(8, fn) } func Default12(fn func(any)) *ZQ { return DefaultN(12, fn) } func Default16(fn func(any)) *ZQ { return DefaultN(16, fn) } func Default24(fn func(any)) *ZQ { return DefaultN(24, fn) } func Default32(fn func(any)) *ZQ { return DefaultN(32, fn) } func Default48(fn func(any)) *ZQ { return DefaultN(48, fn) } func Default64(fn func(any)) *ZQ { return DefaultN(64, fn) } func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ { q.ZQuit = quit return q } func (q *ZQ) AddWorker(delta int) { if delta < 1 { for i := 0; i < delta*-1; i++ { q.complete <- true } return } for i := 0; i < delta; i++ { q.WorkerCounter.Add(1) go func() { defer q.WorkerCounter.Add(-1) for { select { case <-q.complete: return case j := <-q.jobs: q.WorkerFn(j) q.ZQuit.Done() } } }() } } func (q *ZQ) RemoveWorker() { q.AddWorker(-1) } func (q *ZQ) AddJob(a any) error { if q.ZQuit.IsQuit() { return fmt.Errorf("quit") } q.ZQuit.Inc() q.jobs <- a return nil } func (q *ZQ) Wait() { q.ZQuit.Wait() } func (q *ZQ) Quit() { q.Wait() q.AddWorker(q.WorkerCounter.LoadInverse()) }