diff --git a/zatomic/zatomic.go b/zatomic/zatomic.go index 2a6275c0ce184cec44715c77b552eb307feb690a..3b6f689677e20818d75a21e092a6144942c50bd5 100644 --- a/zatomic/zatomic.go +++ b/zatomic/zatomic.go @@ -18,6 +18,14 @@ func (i *Int) Add(delta int) int { return int(i.v.Add(int32(delta))) } +func (i *Int) Inc() int { + return i.Add(1) +} + +func (i *Int) Dec() int { + return i.Add(-1) +} + func (i *Int) Load() int { return int(i.v.Load()) } diff --git a/zq/zq.go b/zq/zq.go index e86c7785265b55172cd56a6fa28da0206b6982e2..8a9bf011e1515bfd1a6103774940a4036e7ef962 100644 --- a/zq/zq.go +++ b/zq/zq.go @@ -9,63 +9,63 @@ import ( type ZQ struct { WorkerCounter *zatomic.Int - WorkerFn func(any) + WorkerFn func(int, any) ZQuit *zquit.ZQuit + Jobs chan any + workerId *zatomic.Int complete chan bool - Jobs chan any } -func New(qtyWorkers int, buff int, fn func(any)) *ZQ { +func New(qtyWorkers int, buff int, fn func(int, any)) *ZQ { q := &ZQ{ WorkerCounter: zatomic.New(), WorkerFn: fn, ZQuit: zquit.Default(), Jobs: make(chan any, buff), complete: make(chan bool), + workerId: zatomic.New(), } - for i := 0; i < qtyWorkers; i++ { - q.AddWorker(1) - } + q.AddWorker(qtyWorkers) return q } -func Default(fn func(any)) *ZQ { +func Default(fn func(int, any)) *ZQ { return DefaultN(runtime.NumCPU(), fn) } -func DefaultN(qtyWorkers int, fn func(any)) *ZQ { +func DefaultN(qtyWorkers int, fn func(int, any)) *ZQ { return New(qtyWorkers, qtyWorkers*2, fn) } -func Default2(fn func(any)) *ZQ { +func Default2(fn func(int, any)) *ZQ { return DefaultN(2, fn) } -func Default4(fn func(any)) *ZQ { +func Default4(fn func(int, any)) *ZQ { return DefaultN(4, fn) } -func Default6(fn func(any)) *ZQ { +func Default6(fn func(int, any)) *ZQ { return DefaultN(6, fn) } -func Default8(fn func(any)) *ZQ { +func Default8(fn func(int, any)) *ZQ { return DefaultN(8, fn) } -func Default12(fn func(any)) *ZQ { +func Default12(fn func(int, any)) *ZQ { return DefaultN(12, fn) } -func Default16(fn func(any)) *ZQ { +func Default16(fn func(int, any)) *ZQ { return DefaultN(16, fn) } -func Default24(fn func(any)) *ZQ { +func Default24(fn func(int, any)) *ZQ { return DefaultN(24, fn) } -func Default32(fn func(any)) *ZQ { +func Default32(fn func(int, any)) *ZQ { return DefaultN(32, fn) } -func Default48(fn func(any)) *ZQ { +func Default48(fn func(int, any)) *ZQ { return DefaultN(48, fn) } -func Default64(fn func(any)) *ZQ { +func Default64(fn func(int, any)) *ZQ { return DefaultN(64, fn) } @@ -84,17 +84,18 @@ func (q *ZQ) AddWorker(delta int) { } for i := 0; i < delta; i++ { - q.WorkerCounter.Add(1) + q.WorkerCounter.Inc() go func() { - defer q.WorkerCounter.Add(-1) + defer q.WorkerCounter.Dec() + workerId := q.workerId.Inc() for { select { case <-q.complete: return case j := <-q.Jobs: - q.WorkerFn(j) + q.WorkerFn(workerId, j) q.ZQuit.Done() } }