package zq import ( "fmt" "git.barsukov.pro/barsukov/zgo/zatomic" "git.barsukov.pro/barsukov/zgo/zquit" "runtime" ) type ZQ struct { WorkerCounter *zatomic.Int WorkerFn func(int, any) ZQuit *zquit.ZQuit Jobs chan any workerID *zatomic.Int complete chan bool } func New(qtyWorkers int, buff int, fn func(int, any)) *ZQ { q := &ZQ{ WorkerCounter: zatomic.New(), WorkerFn: fn, ZQuit: zquit.Default(), Jobs: make(chan any, buff), complete: make(chan bool), workerID: zatomic.New(), } q.AddWorker(qtyWorkers) return q } func Default(fn func(int, any)) *ZQ { return DefaultN(runtime.NumCPU(), fn) } func DefaultN(qtyWorkers int, fn func(int, any)) *ZQ { return New(qtyWorkers, qtyWorkers*2, fn) } func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ { q.ZQuit = quit return q } func (q *ZQ) AddWorker(delta int) { if delta < 1 { for i := 0; i < delta*-1; i++ { q.complete <- true } return } for i := 0; i < delta; i++ { q.WorkerCounter.Inc() go func() { defer q.WorkerCounter.Dec() workerID := q.workerID.Inc() for { select { case <-q.complete: return case j := <-q.Jobs: q.WorkerFn(workerID, j) q.ZQuit.Done() } } }() } } func (q *ZQ) RemoveWorker() { q.AddWorker(-1) } func (q *ZQ) AddJob(a any) error { if q.ZQuit.IsQuit() { return fmt.Errorf("quit") } q.ZQuit.Inc() q.Jobs <- a return nil } func (q *ZQ) AddJobForce(a any) error { q.ZQuit.Inc() q.Jobs <- a return nil } func (q *ZQ) MustJob(a any) { q.ZQuit.Inc() q.Jobs <- a } func (q *ZQ) Wait() { q.ZQuit.Wait() } func (q *ZQ) Quit() { q.Wait() q.AddWorker(q.WorkerCounter.LoadInverse()) }