package mq

import (
	"epur-pay/pkg/logger"
	"epur-pay/pkg/rds"
	"epur-pay/pkg/utils"
	"errors"
	"gorm.io/gorm"
	"reflect"
	"runtime"
)

func (p *ConsumerApi) consumerEx() {

	defer func() {
		err := recover()
		if err != nil {
			buf := make([]byte, 1<<16)
			runtime.Stack(buf, true)
			e := reflect.ValueOf(err)
			logger.ErrorLogger.Errorln(e.String(), string(buf))
			p.E = errors.New(string(buf))
		}
	}()

	var c int

	for {
		p.E = p.F(ComsumerParams{Data: []byte(p.A.Data), Func: p.A.Func, DB: p.DB})
		if p.E != nil && p.A.Ack && p.A.RetryCount > c {
			c++
			continue
		} else {
			break
		}
	}
}

func (p *ConsumerApi) consumer() {

	if p.A.Transaction {
		if err := rds.DB.Transaction(func(ts *gorm.DB) error {
			p.DB = ts
			p.consumerEx()
			p.end()
			return p.E
		}); err != nil {
			logger.ErrorLogger.Errorln(err.Error())
		}
	} else {
		p.DB = rds.DB
		p.consumerEx()
		p.end()
	}
}

func (p *ConsumerApi) end() {

	if p.E != nil {
		p.A.Status = "1"

		if p.A.Store {
			utils.Error(rds.DB.Table(p.A.TableName()).Where(
				"id=?", p.A.Id).Updates(map[string]interface{}{
				"status":  "1",
				"err_msg": p.E.Error(),
			}).Error)
		}
	} else {
		p.A.Status = "0"

		if p.A.Store {
			utils.Error(rds.DB.Table(p.A.TableName()).Where(
				"id=?", p.A.Id).Updates(map[string]interface{}{
				"status":        "0",
				"complete_time": utils.Time2StampSecond(),
			}).Error)
		}
	}

	if !(p.E != nil && !p.A.ChainErrorTriger) {
		for idx := range p.A.Chains {
			p.P.add(p.A.Chains[idx])
		}
	}
}