package mq import ( "database/sql/driver" "encoding/json" "gorm.io/gorm" ) type AsyncTask struct { Id int64 `gorm:"primary_key;column:id" json:"id"` // 唯一值 ParentId int64 `gorm:"column:parent_id" json:"parentId"` // 父级ID Topics string `gorm:"column:topics" json:"topics"` // 主题 IsDelay bool `gorm:"column:is_delay" json:"isDelay"` // 是否延迟执行 Delay int64 `gorm:"column:delay" json:"delay"` // 延迟秒数 ExpireTime int64 `gorm:"column:exprie_time" json:"expireTime"` // 延迟执行时间 Data string `gorm:"column:data" json:"data"` // 数据 ErrMsg string `gorm:"column:err_msg" json:"errMsg"` // 错误信息 Status string `gorm:"column:status" json:"status"` // 0-成功 1-失败 2-等待执行 Ack bool `gorm:"column:ack" json:"ack"` // 是否Ack RetryCount int `gorm:"column:retry_count" json:"retryCount"` // 重试次数 Transaction bool `gorm:"column:transaction" json:"transaction"` // 是否开启事物 CreateTime int64 `gorm:"column:create_time" json:"createTime"` // 创建时间 CompleteTime int64 `gorm:"column:complete_time" json:"completeTime"` // 完成时间 ChainErrorTriger bool `gorm:"column:chain_error_triger" json:"chainErrorTriger"` // 是否错误继续调用 Store bool `gorm:"-" json:"-"` // 是否持久话 Chains AsyncTasks `gorm:"-" json:"chains"` // 链式调用 Func func() `gorm:"-" json:"-"` // 函数调用 } func (AsyncTask) TableName() string { return "async_task" } type AsyncTasks []*AsyncTask func (j *AsyncTasks) Scan(value interface{}) error { return json.Unmarshal(value.([]byte), &j) } func (j AsyncTasks) Value() (driver.Value, error) { return json.Marshal(j) } func (s AsyncTasks) Len() int { return len(s) } func (s AsyncTasks) Less(i, j int) bool { return s[i].ExpireTime < s[j].ExpireTime } func (s AsyncTasks) Swap(i, j int) { s[i], s[j] = s[j], s[i] } type ComsumerParams struct { Data []byte Func func() DB *gorm.DB } type ConsumerFunc func(body ComsumerParams) error type ConsumerApi struct { E error F ConsumerFunc A *AsyncTask P *Async DB *gorm.DB }