2025-04-18 10:08:14 +00:00
|
|
|
|
package sms_scheduler
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"go-admin/app/admin/models/bus_models"
|
|
|
|
|
"go-admin/common/database"
|
|
|
|
|
"go-admin/common/redisx"
|
2025-05-19 07:39:05 +00:00
|
|
|
|
"go-admin/tools/sms"
|
2025-04-18 10:08:14 +00:00
|
|
|
|
"golang.org/x/net/context"
|
2025-05-19 07:39:05 +00:00
|
|
|
|
"gorm.io/gorm/clause"
|
2025-04-18 10:08:14 +00:00
|
|
|
|
"log"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var db *gorm.DB // 需要先初始化你的 GORM DB
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
maxRetry = 3
|
|
|
|
|
workerPoolSize = 5
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func ProcessPendingBatches() {
|
|
|
|
|
log.Println("📡 [Scheduler] 扫描待发送批次...")
|
|
|
|
|
db = database.Db
|
|
|
|
|
batches := fetchPendingBatches()
|
|
|
|
|
if len(batches) == 0 {
|
|
|
|
|
log.Println("ℹ️ 无待发批次")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
|
sem := make(chan struct{}, workerPoolSize)
|
|
|
|
|
|
|
|
|
|
for _, batch := range batches {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
sem <- struct{}{} // 限制并发
|
|
|
|
|
|
|
|
|
|
go func(batch bus_models.SmsTaskBatch) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer func() { <-sem }()
|
|
|
|
|
|
|
|
|
|
err := lockAndProcessBatch(batch)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("❌ 批次处理失败: batch_id=%s, err=%v\n", batch.BatchID, err)
|
|
|
|
|
}
|
|
|
|
|
}(batch)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func fetchPendingBatches() []bus_models.SmsTaskBatch {
|
|
|
|
|
var batches []bus_models.SmsTaskBatch
|
2025-05-19 07:39:05 +00:00
|
|
|
|
db.Where("status = ? AND (schedule_time IS NULL OR schedule_time <= ?)", 0, time.Now()).Find(&batches)
|
2025-04-18 10:08:14 +00:00
|
|
|
|
return batches
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func lockAndProcessBatch(batch bus_models.SmsTaskBatch) error {
|
|
|
|
|
tx := db.Begin()
|
|
|
|
|
defer func() {
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
tx.Rollback()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// 锁定批次
|
|
|
|
|
res := tx.Model(&bus_models.SmsTaskBatch{}).
|
|
|
|
|
Where("id = ? AND status = 0", batch.ID).
|
|
|
|
|
Update("status", 1) // 标记发送中
|
|
|
|
|
|
|
|
|
|
if res.RowsAffected == 0 {
|
|
|
|
|
tx.Rollback()
|
|
|
|
|
return nil // 已被其他任务抢占
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tx.Commit()
|
|
|
|
|
log.Printf("🚀 开始处理批次:batch_id=%s\n", batch.BatchID)
|
|
|
|
|
|
|
|
|
|
err := processBatchPhones(batch)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// 更新失败状态
|
|
|
|
|
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 3)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 更新成功状态
|
|
|
|
|
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 1)
|
|
|
|
|
updateTaskStatusIfFinished(batch.TaskID)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func processBatchPhones(batch bus_models.SmsTaskBatch) error {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
importID := batch.ImportID
|
|
|
|
|
num := batch.Num
|
|
|
|
|
|
|
|
|
|
key := fmt.Sprintf("sms:import:%s:%d", importID, num)
|
|
|
|
|
phones, err := redisx.Client.LRange(ctx, key, 0, -1).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("❌ 获取手机号缓存失败:%v\n", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if len(phones) == 0 {
|
|
|
|
|
log.Printf("⚠️ 无手机号可处理,import_id=%s, num=%d\n", importID, num)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var recordsToInsert []bus_models.SmsSendRecord
|
|
|
|
|
|
|
|
|
|
for _, phone := range phones {
|
|
|
|
|
// 避免重复发送:判断该手机号是否已存在记录
|
|
|
|
|
var count int64
|
|
|
|
|
db.Model(&bus_models.SmsSendRecord{}).
|
|
|
|
|
Where("batch_id = ? AND phone = ?", batch.BatchID, phone).
|
|
|
|
|
Count(&count)
|
|
|
|
|
if count > 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 实际发送短信
|
|
|
|
|
success := sendSms(phone, batch.SmsContent)
|
|
|
|
|
status := 1 // 成功
|
|
|
|
|
if !success {
|
|
|
|
|
status = 2 // 失败
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
record := bus_models.SmsSendRecord{
|
|
|
|
|
CooperativeNumber: batch.CooperativeNumber,
|
|
|
|
|
CooperativeName: batch.CooperativeName,
|
|
|
|
|
TaskID: batch.TaskID,
|
|
|
|
|
TaskBatchID: batch.ID,
|
|
|
|
|
Phone: phone,
|
|
|
|
|
BatchID: batch.BatchID,
|
|
|
|
|
SmsContent: batch.SmsContent,
|
|
|
|
|
Status: status,
|
|
|
|
|
}
|
|
|
|
|
recordsToInsert = append(recordsToInsert, record)
|
|
|
|
|
|
|
|
|
|
log.Printf("📨 发送短信:%s,状态=%d\n", phone, status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 批量插入发送记录
|
|
|
|
|
if len(recordsToInsert) > 0 {
|
|
|
|
|
if err := db.Create(&recordsToInsert).Error; err != nil {
|
|
|
|
|
log.Printf("❌ 批量写入发送记录失败:%v\n", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func updateTaskStatusIfFinished(taskID uint64) {
|
|
|
|
|
var pendingCount int64
|
|
|
|
|
db.Model(&bus_models.SmsTaskBatch{}).
|
|
|
|
|
Where("task_id = ? AND status = 0", taskID).
|
|
|
|
|
Count(&pendingCount)
|
|
|
|
|
|
|
|
|
|
if pendingCount > 0 {
|
|
|
|
|
return // 尚有批次未完成,直接返回
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 统计该任务所有失败短信数(status = 2)
|
|
|
|
|
var failCount int64
|
|
|
|
|
db.Model(&bus_models.SmsSendRecord{}).
|
|
|
|
|
Where("task_id = ? AND status = 2", taskID).
|
|
|
|
|
Count(&failCount)
|
|
|
|
|
|
|
|
|
|
// 更新任务状态为已完成,并记录失败条数
|
|
|
|
|
db.Model(&bus_models.SmsTask{}).
|
|
|
|
|
Where("id = ?", taskID).
|
|
|
|
|
Updates(map[string]interface{}{
|
|
|
|
|
"status": 2,
|
|
|
|
|
"channel_fail_count": failCount,
|
|
|
|
|
})
|
|
|
|
|
|
2025-05-19 07:39:05 +00:00
|
|
|
|
// 查询任务详细信息
|
|
|
|
|
var task bus_models.SmsTask
|
|
|
|
|
if err := db.Model(&bus_models.SmsTask{}).Where("id = ?", taskID).First(&task).Error; err != nil {
|
|
|
|
|
log.Printf("查询SmsTask失败, taskID=%d, err=%v", taskID, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 查询合作商信息
|
|
|
|
|
var coop bus_models.BusCooperative
|
|
|
|
|
if err := db.Model(&bus_models.BusCooperative{}).Where("cooperative_number = ?", task.CooperativeNumber).First(&coop).Error; err != nil {
|
|
|
|
|
log.Printf("查询合作商失败, CooperativeNumber=%s, err=%v", task.CooperativeNumber, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 查询合作商产品信息
|
|
|
|
|
var coopProduct []bus_models.BusCooperativeProduct
|
|
|
|
|
if err := db.Where("cooperative_id = ?", task.CooperativeNumber).Find(&coopProduct).Error; err != nil {
|
|
|
|
|
log.Printf("查询合作商产品失败: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
productFlag := false
|
|
|
|
|
var productPrice float64
|
|
|
|
|
for _, item := range coopProduct {
|
|
|
|
|
// 查询产品信息
|
|
|
|
|
var product bus_models.BusProduct
|
|
|
|
|
if err := db.Where("id = ?", item.ProductID).First(&product).Error; err != nil {
|
|
|
|
|
log.Printf("查询产品失败: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 只处理短信产品 (ProductType=1)
|
|
|
|
|
if product.ProductType != 1 {
|
|
|
|
|
continue
|
|
|
|
|
} else {
|
|
|
|
|
productPrice = product.Price * product.Discount
|
|
|
|
|
productFlag = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !productFlag {
|
|
|
|
|
log.Printf("未配置短信产品")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 记录短信消耗情况
|
|
|
|
|
// 计算总消耗金额
|
|
|
|
|
totalAmount := float64(task.TotalSmsCount) * productPrice
|
|
|
|
|
|
|
|
|
|
if totalAmount > 0 {
|
|
|
|
|
// ⚡ 处理扣款逻辑
|
|
|
|
|
err := db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
// 重新锁定合作商数据
|
|
|
|
|
var lockedCoop bus_models.BusCooperative
|
|
|
|
|
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
|
|
|
|
|
Where("cooperative_number = ?", task.CooperativeNumber).
|
|
|
|
|
First(&lockedCoop).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("锁定合作商失败: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
remain := totalAmount
|
|
|
|
|
updateFields := make(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
// 先扣余额
|
|
|
|
|
if lockedCoop.Balance >= remain {
|
|
|
|
|
updateFields["balance"] = lockedCoop.Balance - remain
|
|
|
|
|
remain = 0
|
|
|
|
|
} else {
|
|
|
|
|
remain -= lockedCoop.Balance
|
|
|
|
|
updateFields["balance"] = 0.0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 余额不够,再扣赠送余额
|
|
|
|
|
if remain > 0 {
|
|
|
|
|
if lockedCoop.Free >= remain {
|
|
|
|
|
updateFields["free"] = lockedCoop.Free - remain
|
|
|
|
|
remain = 0
|
|
|
|
|
} else {
|
|
|
|
|
updateFields["free"] = 0.0
|
|
|
|
|
remain -= lockedCoop.Free
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 如果到这里 remain > 0,说明账户和赠送余额都不够
|
|
|
|
|
if remain > 0 {
|
|
|
|
|
log.Printf("⚠️ 合作商账户和赠送余额均不足,CooperativeNumber=%s, 欠费金额=%.3f", task.CooperativeNumber, remain)
|
|
|
|
|
// 你可以根据需要抛错或者继续执行
|
|
|
|
|
return fmt.Errorf("账户余额不足")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 更新合作商账户
|
|
|
|
|
if err := tx.Model(&bus_models.BusCooperative{}).
|
|
|
|
|
Where("id = ?", lockedCoop.ID).
|
|
|
|
|
Updates(updateFields).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("更新合作商账户失败: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 记录消耗日志
|
|
|
|
|
if err := bus_models.RecordCooperativeConsumptionLog(
|
|
|
|
|
tx,
|
|
|
|
|
task.CooperativeNumber,
|
|
|
|
|
1, // 消耗类型:短信发送
|
|
|
|
|
task.ID,
|
|
|
|
|
totalAmount,
|
|
|
|
|
fmt.Sprintf("短信发送任务(任务ID:%d)", task.ID),
|
|
|
|
|
fmt.Sprintf("总发送条数:%d, 失败条数:%d, 实际计费条数:%d, 单价:%.3f元/条", task.TotalSmsCount,
|
|
|
|
|
task.ChannelFailCount, task.TotalSmsCount, productPrice),
|
|
|
|
|
); err != nil {
|
|
|
|
|
return fmt.Errorf("记录合作商消耗日志失败: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("更新合作商扣款失败: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-04-18 10:08:14 +00:00
|
|
|
|
log.Printf("🎉 任务 task_id=%d 已全部完成!失败短信总数=%d\n", taskID, failCount)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 模拟发送逻辑(生产环境接通短信通道)
|
|
|
|
|
func sendSms(phone, content string) bool {
|
2025-05-19 07:39:05 +00:00
|
|
|
|
//// 可接入真实短信发送SDK
|
|
|
|
|
//time.Sleep(10 * time.Millisecond)
|
|
|
|
|
//return rand.Intn(4) != 0 // 25% 失败
|
|
|
|
|
|
|
|
|
|
var phoneList []string
|
|
|
|
|
phoneList = append(phoneList, phone)
|
|
|
|
|
|
|
|
|
|
err := sms.GtSendMessage(phoneList, content)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("GtSendMessage err: %v", err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return true
|
2025-04-18 10:08:14 +00:00
|
|
|
|
}
|