Commit d34b26e7 authored by Vladimir Barsukov's avatar Vladimir Barsukov
Browse files

fix

parent 92afcbbd
...@@ -55,7 +55,7 @@ func (d *Pool) qAny(q *Conn, dst any, sql string, args ...any) error { ...@@ -55,7 +55,7 @@ func (d *Pool) qAny(q *Conn, dst any, sql string, args ...any) error {
} else if f.DataTypeOID == pgtype.NumericOID { } else if f.DataTypeOID == pgtype.NumericOID {
ff, _ := v[i].(pgtype.Numeric).Float64Value() ff, _ := v[i].(pgtype.Numeric).Float64Value()
r[f.Name] = ff.Float64 r[f.Name] = ff.Float64
} else if f.DataTypeOID == pgtype.TimestampOID && d.UseGoTime == false { } else if f.DataTypeOID == pgtype.TimestampOID && !d.UseGoTime {
r[f.Name] = v[i].(time.Time).Format(d.PgTsFormat) r[f.Name] = v[i].(time.Time).Format(d.PgTsFormat)
} else if f.DataTypeOID == pgtype.UUIDOID { } else if f.DataTypeOID == pgtype.UUIDOID {
bb := v[i].([16]byte) bb := v[i].([16]byte)
......
...@@ -354,14 +354,14 @@ func (d *Pool) IsAlive() bool { ...@@ -354,14 +354,14 @@ func (d *Pool) IsAlive() bool {
func (d *Pool) prepare(sql string, param map[string]any) string { func (d *Pool) prepare(sql string, param map[string]any) string {
for n, t := range param { for n, t := range param {
switch t.(type) { switch tv := t.(type) {
case time.Time: case time.Time:
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(time.Time).UTC().Format(time.DateTime))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", tv.UTC().Format(time.DateTime)))
case *time.Time: case *time.Time:
if t.(*time.Time) == nil { if tv == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", t.(*time.Time).UTC().Format(time.DateTime))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%s'", tv.UTC().Format(time.DateTime)))
} }
case nil: case nil:
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
...@@ -381,35 +381,35 @@ func (d *Pool) prepare(sql string, param map[string]any) string { ...@@ -381,35 +381,35 @@ func (d *Pool) prepare(sql string, param map[string]any) string {
if t.(string) == "NULL" { if t.(string) == "NULL" {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(t.(string), "'", "''"))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("'%v'", strings.ReplaceAll(tv, "'", "''")))
} }
case *int: case *int:
if t.(*int) == nil { if t.(*int) == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
} }
case *bool: case *bool:
if t.(*bool) == nil { if tv == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
if *t.(*bool) { if *tv {
sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE") sql = strings.ReplaceAll(sql, ":"+n+":", "TRUE")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE") sql = strings.ReplaceAll(sql, ":"+n+":", "FALSE")
} }
} }
case *int64: case *int64:
if t.(*int64) == nil { if tv == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*int64))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
} }
case *float64: case *float64:
if t.(*float64) == nil { if tv == nil {
sql = strings.ReplaceAll(sql, ":"+n+":", "NULL") sql = strings.ReplaceAll(sql, ":"+n+":", "NULL")
} else { } else {
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *t.(*float64))) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", *tv))
} }
case int, int64: case int, int64:
sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t)) sql = strings.ReplaceAll(sql, ":"+n+":", fmt.Sprintf("%v", t))
......
package zq package zq
import ( import (
"fmt"
"git.barsukov.pro/barsukov/zgo/zatomic" "git.barsukov.pro/barsukov/zgo/zatomic"
"git.barsukov.pro/barsukov/zgo/zwg" "git.barsukov.pro/barsukov/zgo/zquit"
"runtime" "runtime"
) )
type ZQ struct { type ZQ struct {
WorkerCounter *zatomic.Int WorkerCounter *zatomic.Int
WorkerFn func(any) WorkerFn func(any)
JobsWg *zwg.Zwg ZQuit *zquit.ZQuit
complete chan bool complete chan bool
jobs chan any jobs chan any
...@@ -19,7 +20,7 @@ func New(qtyWorkers int, buff int, fn func(any)) *ZQ { ...@@ -19,7 +20,7 @@ func New(qtyWorkers int, buff int, fn func(any)) *ZQ {
q := &ZQ{ q := &ZQ{
WorkerCounter: zatomic.New(), WorkerCounter: zatomic.New(),
WorkerFn: fn, WorkerFn: fn,
JobsWg: zwg.New(), ZQuit: zquit.Default(),
jobs: make(chan any, buff), jobs: make(chan any, buff),
complete: make(chan bool), complete: make(chan bool),
} }
...@@ -68,6 +69,11 @@ func Default64(fn func(any)) *ZQ { ...@@ -68,6 +69,11 @@ func Default64(fn func(any)) *ZQ {
return DefaultN(64, fn) return DefaultN(64, fn)
} }
func (q *ZQ) WithZQuit(quit *zquit.ZQuit) *ZQ {
q.ZQuit = quit
return q
}
func (q *ZQ) AddWorker(delta int) { func (q *ZQ) AddWorker(delta int) {
if delta < 1 { if delta < 1 {
for i := 0; i < delta*-1; i++ { for i := 0; i < delta*-1; i++ {
...@@ -89,7 +95,7 @@ func (q *ZQ) AddWorker(delta int) { ...@@ -89,7 +95,7 @@ func (q *ZQ) AddWorker(delta int) {
return return
case j := <-q.jobs: case j := <-q.jobs:
q.WorkerFn(j) q.WorkerFn(j)
q.JobsWg.Done() q.ZQuit.Done()
} }
} }
}() }()
...@@ -100,15 +106,19 @@ func (q *ZQ) RemoveWorker() { ...@@ -100,15 +106,19 @@ func (q *ZQ) RemoveWorker() {
q.AddWorker(-1) q.AddWorker(-1)
} }
func (q *ZQ) AddJob(a any) *ZQ { func (q *ZQ) AddJob(a any) error {
q.JobsWg.Inc() if q.ZQuit.IsQuit() {
return fmt.Errorf("quit")
}
q.ZQuit.Inc()
q.jobs <- a q.jobs <- a
return q return nil
} }
func (q *ZQ) Wait() { func (q *ZQ) Wait() {
q.JobsWg.Wait() q.ZQuit.Wait()
} }
func (q *ZQ) Quit() { func (q *ZQ) Quit() {
......
package zquit package zquit
import ( import (
"log"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
...@@ -28,13 +29,25 @@ func (p *Pool) Add(z *ZQuit) { ...@@ -28,13 +29,25 @@ func (p *Pool) Add(z *ZQuit) {
p.items = append(p.items, z) p.items = append(p.items, z)
} }
func (p *Pool) PrintStat() { func (p *Pool) PrintStat(sec int) {
for _, i := range p.items { go func() {
i.PrintStat() for {
} <-time.After(time.Second * time.Duration(sec))
s := ""
for _, i := range p.items {
s += i.GetStat() + "; "
}
log.Printf("Z_QUIT: %s", s)
}
}()
} }
func (p *Pool) Wait() { func (p *Pool) Wait() {
for _, i := range p.items {
i.Shutdown()
}
for _, i := range p.items { for _, i := range p.items {
i.Wait() i.Wait()
} }
......
package zquit package zquit
import ( import (
"fmt"
"git.barsukov.pro/barsukov/zgo/zwg" "git.barsukov.pro/barsukov/zgo/zwg"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"log" "log"
...@@ -95,11 +96,14 @@ func (q *ZQuit) Middleware(c *gin.Context) { ...@@ -95,11 +96,14 @@ func (q *ZQuit) Middleware(c *gin.Context) {
c.Next() c.Next()
} }
func (q *ZQuit) PrintStat() { func (q *ZQuit) GetStat() string {
return fmt.Sprintf("%s: %v", q.Name, q.Val())
}
func (q *ZQuit) PrintStat(sec int) {
go func() { go func() {
for { for {
<-time.After(time.Second) <-time.After(time.Second * time.Duration(sec))
log.Printf("Z_QUIT: LOCKS[%s]: %v", q.Name, q.Val()) log.Printf("Z_QUIT: %s", q.GetStat())
} }
}() }()
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment