package mq import ( "epur-pay/pkg/idGenerate" "epur-pay/pkg/rds" "epur-pay/pkg/utils" "strconv" "sync" "time" ) var AysncInstance *Async type Async struct { //Tasks AsyncTasks // 数据 model.AsyncTasks Queue *sync.Map // chan model.AsyncTask handlerCh chan []interface{} } type Task struct { Topics string `json:"topics"` // 主题 Delay int64 `json:"delay"` // 延迟秒数 Data []byte `json:"data"` // 数据 Ack bool `json:"bool"` // 是否Ack RetryCount int `json:"retryCount"` // 重试次数 Transaction bool `json:"transaction"` // 是否开启事物 Store bool `json:"store"` // 是否持久化 ChainErrorTriger bool `json:"chainErrorTriger"` // 是否错误继续调用 Chains []*Task `json:"-"` // 链式调用 Func func() `json:"-"` // 函数调用 } func New() *Async { AysncInstance = &Async{handlerCh: make(chan []interface{}, 1024*10), Queue: new(sync.Map)} return AysncInstance } func (p *Async) SetTopics(data ...string) *Async { for _, item := range data { ch := make(chan AsyncTask, 1024*10) AysncInstance.Queue.Store(item, ch) } return p } func (p *Async) Listen() { tasks := AsyncTasks{} utils.Error(rds.DB.Table(AsyncTask{}.TableName()).Where( "status in ?", []string{"2"}).Scan(&tasks).Error) for index := range tasks { tasks[index].Store = true AysncInstance.add(tasks[index]) } } func (p *Async) createTaskTo(task *Task, parentTask *AsyncTask) *AsyncTask { task1 := &AsyncTask{ Topics: task.Topics, Data: string(task.Data), Status: "2", Ack: task.Ack, Delay: task.Delay, CreateTime: utils.Time2StampSecond(), RetryCount: task.RetryCount, Transaction: task.Transaction, Store: task.Store, ChainErrorTriger: task.ChainErrorTriger, Func: task.Func, } if task1.Func != nil { task1.Store = false } if parentTask != nil { parentTask.Chains = append(parentTask.Chains, task1) task1.ParentId = parentTask.Id task1.Store = parentTask.Store task1.ChainErrorTriger = parentTask.ChainErrorTriger } if task1.Delay > 0 { task1.IsDelay = true task1.ExpireTime = task1.CreateTime + task1.Delay } if task1.RetryCount <= 0 { task1.RetryCount = 2 } if task1.Store { utils.Error(rds.DB.Create(task1).Error) } else { task1.Id, _ = strconv.ParseInt(idGenerate.ID.Generate(""), 10, 64) } for idx := range task.Chains { if task.Chains[idx] != nil { p.createTaskTo(task.Chains[idx], task1) } } return task1 } func (p *Async) Producer(task *Task) { task1 := p.createTaskTo(task, nil) if len(task1.Chains) > 0 { utils.DbErrSkipRecordNotFound(rds.DB.Save(task1).Error) } p.add(task1) } func (p *Async) Consumer(topics string, f ConsumerFunc, poolCount int) *Async { if _, ok := AysncInstance.Queue.Load(topics); !ok { ch := make(chan *AsyncTask, 1024*10) AysncInstance.Queue.Store(topics, ch) } else { return p } if poolCount <= 0 { poolCount = 1 } for i := 0; i < poolCount; i++ { go func(topics string, f ConsumerFunc) { if ch, ok := AysncInstance.Queue.Load(topics); ok { //logger.AccessLogger.Infoln("MQ订阅:", topics) for row := range ch.(chan *AsyncTask) { //logger.AccessLogger.Infoln("TaskId:", row.Id) go func(row *AsyncTask) { api := ConsumerApi{ F: f, A: row, P: p, } api.consumer() }(row) } } }(topics, f) } return p } func (p *Async) push(c chan *AsyncTask, task *AsyncTask) { c <- task } func (p *Async) add(task *AsyncTask) { queue, ok := p.Queue.Load(task.Topics) if !ok { return } if !task.IsDelay { p.push(queue.(chan *AsyncTask), task) } else if task.IsDelay && task.ExpireTime <= utils.Time2StampSecond() { p.push(queue.(chan *AsyncTask), task) } else { go func(task *AsyncTask, c chan *AsyncTask) { timer := time.NewTimer(time.Duration(task.Delay) * time.Second) select { case <-timer.C: p.push(queue.(chan *AsyncTask), task) } timer.Stop() }(task, queue.(chan *AsyncTask)) } }