Commit 4ea14785 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

fix

parent 086a1db2
...@@ -13,7 +13,7 @@ type ZQ struct { ...@@ -13,7 +13,7 @@ type ZQ struct {
ZQuit *zquit.ZQuit ZQuit *zquit.ZQuit
complete chan bool complete chan bool
jobs chan any Jobs chan any
} }
func New(qtyWorkers int, buff int, fn func(any)) *ZQ { func New(qtyWorkers int, buff int, fn func(any)) *ZQ {
...@@ -21,7 +21,7 @@ func New(qtyWorkers int, buff int, fn func(any)) *ZQ { ...@@ -21,7 +21,7 @@ func New(qtyWorkers int, buff int, fn func(any)) *ZQ {
WorkerCounter: zatomic.New(), WorkerCounter: zatomic.New(),
WorkerFn: fn, WorkerFn: fn,
ZQuit: zquit.Default(), ZQuit: zquit.Default(),
jobs: make(chan any, buff), Jobs: make(chan any, buff),
complete: make(chan bool), complete: make(chan bool),
} }
...@@ -36,7 +36,7 @@ func Default(fn func(any)) *ZQ { ...@@ -36,7 +36,7 @@ func Default(fn func(any)) *ZQ {
} }
func DefaultN(qtyWorkers int, fn func(any)) *ZQ { func DefaultN(qtyWorkers int, fn func(any)) *ZQ {
return New(qtyWorkers, 0, fn) return New(qtyWorkers, qtyWorkers*2, fn)
} }
func Default2(fn func(any)) *ZQ { func Default2(fn func(any)) *ZQ {
return DefaultN(2, fn) return DefaultN(2, fn)
...@@ -93,7 +93,7 @@ func (q *ZQ) AddWorker(delta int) { ...@@ -93,7 +93,7 @@ func (q *ZQ) AddWorker(delta int) {
select { select {
case <-q.complete: case <-q.complete:
return return
case j := <-q.jobs: case j := <-q.Jobs:
q.WorkerFn(j) q.WorkerFn(j)
q.ZQuit.Done() q.ZQuit.Done()
} }
...@@ -112,14 +112,14 @@ func (q *ZQ) AddJob(a any) error { ...@@ -112,14 +112,14 @@ func (q *ZQ) AddJob(a any) error {
} }
q.ZQuit.Inc() q.ZQuit.Inc()
q.jobs <- a q.Jobs <- a
return nil return nil
} }
func (q *ZQ) AddJobForce(a any) error { func (q *ZQ) AddJobForce(a any) error {
q.ZQuit.Inc() q.ZQuit.Inc()
q.jobs <- a q.Jobs <- a
return nil return nil
} }
......
...@@ -44,10 +44,6 @@ func (p *Pool) PrintStat(sec int) { ...@@ -44,10 +44,6 @@ func (p *Pool) PrintStat(sec int) {
} }
func (p *Pool) Wait() { func (p *Pool) Wait() {
for _, i := range p.items {
i.Shutdown()
}
for _, i := range p.items { for _, i := range p.items {
i.Wait() i.Wait()
} }
...@@ -62,6 +58,10 @@ func (p *Pool) WaitInterruptPrePost(pre func(), post func()) { ...@@ -62,6 +58,10 @@ func (p *Pool) WaitInterruptPrePost(pre func(), post func()) {
pre() pre()
} }
for _, i := range p.items {
i.Shutdown()
}
p.Wait() p.Wait()
time.Sleep(p.PostWaitDur) time.Sleep(p.PostWaitDur)
......
...@@ -45,8 +45,6 @@ func (q *ZQuit) Done() { ...@@ -45,8 +45,6 @@ func (q *ZQuit) Done() {
} }
func (q *ZQuit) Wait() { func (q *ZQuit) Wait() {
q.Shutdown()
q.wg.Wait() q.wg.Wait()
} }
...@@ -71,6 +69,7 @@ func (q *ZQuit) WaitInterruptPrePost(pre func(), post func()) { ...@@ -71,6 +69,7 @@ func (q *ZQuit) WaitInterruptPrePost(pre func(), post func()) {
pre() pre()
} }
q.Shutdown()
q.Wait() q.Wait()
time.Sleep(q.PostWaitDur) time.Sleep(q.PostWaitDur)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment