package mq

import (
	"epur-pay/pkg/idGenerate"
	"epur-pay/pkg/rds"
	"epur-pay/pkg/utils"
	"strconv"
	"sync"
	"time"
)

var AysncInstance *Async

type Async struct {
	//Tasks      AsyncTasks // 数据 model.AsyncTasks
	Queue     *sync.Map // chan model.AsyncTask
	handlerCh chan []interface{}
}

type Task struct {
	Topics           string  `json:"topics"`           // 主题
	Delay            int64   `json:"delay"`            // 延迟秒数
	Data             []byte  `json:"data"`             // 数据
	Ack              bool    `json:"bool"`             // 是否Ack
	RetryCount       int     `json:"retryCount"`       // 重试次数
	Transaction      bool    `json:"transaction"`      // 是否开启事物
	Store            bool    `json:"store"`            // 是否持久化
	ChainErrorTriger bool    `json:"chainErrorTriger"` // 是否错误继续调用
	Chains           []*Task `json:"-"`                // 链式调用
	Func             func()  `json:"-"`                // 函数调用
}

func New() *Async {
	AysncInstance = &Async{handlerCh: make(chan []interface{}, 1024*10), Queue: new(sync.Map)}
	return AysncInstance
}

func (p *Async) SetTopics(data ...string) *Async {
	for _, item := range data {
		ch := make(chan AsyncTask, 1024*10)
		AysncInstance.Queue.Store(item, ch)
	}
	return p
}

func (p *Async) Listen() {

	tasks := AsyncTasks{}
	utils.Error(rds.DB.Table(AsyncTask{}.TableName()).Where(
		"status in ?", []string{"2"}).Scan(&tasks).Error)

	for index := range tasks {
		tasks[index].Store = true
		AysncInstance.add(tasks[index])
	}
}

func (p *Async) createTaskTo(task *Task, parentTask *AsyncTask) *AsyncTask {

	task1 := &AsyncTask{
		Topics:           task.Topics,
		Data:             string(task.Data),
		Status:           "2",
		Ack:              task.Ack,
		Delay:            task.Delay,
		CreateTime:       utils.Time2StampSecond(),
		RetryCount:       task.RetryCount,
		Transaction:      task.Transaction,
		Store:            task.Store,
		ChainErrorTriger: task.ChainErrorTriger,
		Func:             task.Func,
	}

	if task1.Func != nil {
		task1.Store = false
	}

	if parentTask != nil {
		parentTask.Chains = append(parentTask.Chains, task1)
		task1.ParentId = parentTask.Id
		task1.Store = parentTask.Store
		task1.ChainErrorTriger = parentTask.ChainErrorTriger
	}

	if task1.Delay > 0 {
		task1.IsDelay = true
		task1.ExpireTime = task1.CreateTime + task1.Delay
	}
	if task1.RetryCount <= 0 {
		task1.RetryCount = 2
	}

	if task1.Store {
		utils.Error(rds.DB.Create(task1).Error)
	} else {
		task1.Id, _ = strconv.ParseInt(idGenerate.ID.Generate(""), 10, 64)
	}

	for idx := range task.Chains {
		if task.Chains[idx] != nil {
			p.createTaskTo(task.Chains[idx], task1)
		}
	}

	return task1
}

func (p *Async) Producer(task *Task) {
	task1 := p.createTaskTo(task, nil)
	if len(task1.Chains) > 0 {
		utils.DbErrSkipRecordNotFound(rds.DB.Save(task1).Error)
	}
	p.add(task1)
}

func (p *Async) Consumer(topics string, f ConsumerFunc, poolCount int) *Async {

	if _, ok := AysncInstance.Queue.Load(topics); !ok {
		ch := make(chan *AsyncTask, 1024*10)
		AysncInstance.Queue.Store(topics, ch)
	} else {
		return p
	}

	if poolCount <= 0 {
		poolCount = 1
	}

	for i := 0; i < poolCount; i++ {
		go func(topics string, f ConsumerFunc) {
			if ch, ok := AysncInstance.Queue.Load(topics); ok {

				//logger.AccessLogger.Infoln("MQ订阅:", topics)
				for row := range ch.(chan *AsyncTask) {
					//logger.AccessLogger.Infoln("TaskId:", row.Id)
					go func(row *AsyncTask) {
						api := ConsumerApi{
							F: f,
							A: row,
							P: p,
						}
						api.consumer()
					}(row)
				}
			}
		}(topics, f)
	}
	return p
}

func (p *Async) push(c chan *AsyncTask, task *AsyncTask) {
	c <- task
}

func (p *Async) add(task *AsyncTask) {

	queue, ok := p.Queue.Load(task.Topics)
	if !ok {
		return
	}

	if !task.IsDelay {
		p.push(queue.(chan *AsyncTask), task)
	} else if task.IsDelay && task.ExpireTime <= utils.Time2StampSecond() {
		p.push(queue.(chan *AsyncTask), task)
	} else {
		go func(task *AsyncTask, c chan *AsyncTask) {
			timer := time.NewTimer(time.Duration(task.Delay) * time.Second)
			select {
			case <-timer.C:
				p.push(queue.(chan *AsyncTask), task)
			}
			timer.Stop()
		}(task, queue.(chan *AsyncTask))
	}
}