You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.2 KiB

package mq
import (
"epur-pay/pkg/idGenerate"
"epur-pay/pkg/rds"
"epur-pay/pkg/utils"
"strconv"
"sync"
"time"
)
var AysncInstance *Async
type Async struct {
//Tasks AsyncTasks // 数据 model.AsyncTasks
Queue *sync.Map // chan model.AsyncTask
handlerCh chan []interface{}
}
type Task struct {
Topics string `json:"topics"` // 主题
Delay int64 `json:"delay"` // 延迟秒数
Data []byte `json:"data"` // 数据
Ack bool `json:"bool"` // 是否Ack
RetryCount int `json:"retryCount"` // 重试次数
Transaction bool `json:"transaction"` // 是否开启事物
Store bool `json:"store"` // 是否持久化
ChainErrorTriger bool `json:"chainErrorTriger"` // 是否错误继续调用
Chains []*Task `json:"-"` // 链式调用
Func func() `json:"-"` // 函数调用
}
func New() *Async {
AysncInstance = &Async{handlerCh: make(chan []interface{}, 1024*10), Queue: new(sync.Map)}
return AysncInstance
}
func (p *Async) SetTopics(data ...string) *Async {
for _, item := range data {
ch := make(chan AsyncTask, 1024*10)
AysncInstance.Queue.Store(item, ch)
}
return p
}
func (p *Async) Listen() {
tasks := AsyncTasks{}
utils.Error(rds.DB.Table(AsyncTask{}.TableName()).Where(
"status in ?", []string{"2"}).Scan(&tasks).Error)
for index := range tasks {
tasks[index].Store = true
AysncInstance.add(tasks[index])
}
}
func (p *Async) createTaskTo(task *Task, parentTask *AsyncTask) *AsyncTask {
task1 := &AsyncTask{
Topics: task.Topics,
Data: string(task.Data),
Status: "2",
Ack: task.Ack,
Delay: task.Delay,
CreateTime: utils.Time2StampSecond(),
RetryCount: task.RetryCount,
Transaction: task.Transaction,
Store: task.Store,
ChainErrorTriger: task.ChainErrorTriger,
Func: task.Func,
}
if task1.Func != nil {
task1.Store = false
}
if parentTask != nil {
parentTask.Chains = append(parentTask.Chains, task1)
task1.ParentId = parentTask.Id
task1.Store = parentTask.Store
task1.ChainErrorTriger = parentTask.ChainErrorTriger
}
if task1.Delay > 0 {
task1.IsDelay = true
task1.ExpireTime = task1.CreateTime + task1.Delay
}
if task1.RetryCount <= 0 {
task1.RetryCount = 2
}
if task1.Store {
utils.Error(rds.DB.Create(task1).Error)
} else {
task1.Id, _ = strconv.ParseInt(idGenerate.ID.Generate(""), 10, 64)
}
for idx := range task.Chains {
if task.Chains[idx] != nil {
p.createTaskTo(task.Chains[idx], task1)
}
}
return task1
}
func (p *Async) Producer(task *Task) {
task1 := p.createTaskTo(task, nil)
if len(task1.Chains) > 0 {
utils.DbErrSkipRecordNotFound(rds.DB.Save(task1).Error)
}
p.add(task1)
}
func (p *Async) Consumer(topics string, f ConsumerFunc, poolCount int) *Async {
if _, ok := AysncInstance.Queue.Load(topics); !ok {
ch := make(chan *AsyncTask, 1024*10)
AysncInstance.Queue.Store(topics, ch)
} else {
return p
}
if poolCount <= 0 {
poolCount = 1
}
for i := 0; i < poolCount; i++ {
go func(topics string, f ConsumerFunc) {
if ch, ok := AysncInstance.Queue.Load(topics); ok {
//logger.AccessLogger.Infoln("MQ订阅:", topics)
for row := range ch.(chan *AsyncTask) {
//logger.AccessLogger.Infoln("TaskId:", row.Id)
go func(row *AsyncTask) {
api := ConsumerApi{
F: f,
A: row,
P: p,
}
api.consumer()
}(row)
}
}
}(topics, f)
}
return p
}
func (p *Async) push(c chan *AsyncTask, task *AsyncTask) {
c <- task
}
func (p *Async) add(task *AsyncTask) {
queue, ok := p.Queue.Load(task.Topics)
if !ok {
return
}
if !task.IsDelay {
p.push(queue.(chan *AsyncTask), task)
} else if task.IsDelay && task.ExpireTime <= utils.Time2StampSecond() {
p.push(queue.(chan *AsyncTask), task)
} else {
go func(task *AsyncTask, c chan *AsyncTask) {
timer := time.NewTimer(time.Duration(task.Delay) * time.Second)
select {
case <-timer.C:
p.push(queue.(chan *AsyncTask), task)
}
timer.Stop()
}(task, queue.(chan *AsyncTask))
}
}