telco_server/common/sms_scheduler/scheduler.go
chenlin 120379a0f9 1.新增权限校验;
2.修改登陆和校验接口(Authenticator、PayloadFunc);
3.对接绿信通接口;
2025-05-19 15:39:05 +08:00

319 lines
8.2 KiB
Go
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package sms_scheduler
import (
"fmt"
"go-admin/app/admin/models/bus_models"
"go-admin/common/database"
"go-admin/common/redisx"
"go-admin/tools/sms"
"golang.org/x/net/context"
"gorm.io/gorm/clause"
"log"
"sync"
"time"
"gorm.io/gorm"
)
var db *gorm.DB // 需要先初始化你的 GORM DB
const (
maxRetry = 3
workerPoolSize = 5
)
func ProcessPendingBatches() {
log.Println("📡 [Scheduler] 扫描待发送批次...")
db = database.Db
batches := fetchPendingBatches()
if len(batches) == 0 {
log.Println(" 无待发批次")
return
}
wg := sync.WaitGroup{}
sem := make(chan struct{}, workerPoolSize)
for _, batch := range batches {
wg.Add(1)
sem <- struct{}{} // 限制并发
go func(batch bus_models.SmsTaskBatch) {
defer wg.Done()
defer func() { <-sem }()
err := lockAndProcessBatch(batch)
if err != nil {
log.Printf("❌ 批次处理失败: batch_id=%s, err=%v\n", batch.BatchID, err)
}
}(batch)
}
wg.Wait()
}
func fetchPendingBatches() []bus_models.SmsTaskBatch {
var batches []bus_models.SmsTaskBatch
db.Where("status = ? AND (schedule_time IS NULL OR schedule_time <= ?)", 0, time.Now()).Find(&batches)
return batches
}
func lockAndProcessBatch(batch bus_models.SmsTaskBatch) error {
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
// 锁定批次
res := tx.Model(&bus_models.SmsTaskBatch{}).
Where("id = ? AND status = 0", batch.ID).
Update("status", 1) // 标记发送中
if res.RowsAffected == 0 {
tx.Rollback()
return nil // 已被其他任务抢占
}
tx.Commit()
log.Printf("🚀 开始处理批次batch_id=%s\n", batch.BatchID)
err := processBatchPhones(batch)
if err != nil {
// 更新失败状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 3)
return err
}
// 更新成功状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 1)
updateTaskStatusIfFinished(batch.TaskID)
return nil
}
func processBatchPhones(batch bus_models.SmsTaskBatch) error {
ctx := context.Background()
importID := batch.ImportID
num := batch.Num
key := fmt.Sprintf("sms:import:%s:%d", importID, num)
phones, err := redisx.Client.LRange(ctx, key, 0, -1).Result()
if err != nil {
log.Printf("❌ 获取手机号缓存失败:%v\n", err)
return err
}
if len(phones) == 0 {
log.Printf("⚠️ 无手机号可处理import_id=%s, num=%d\n", importID, num)
return nil
}
var recordsToInsert []bus_models.SmsSendRecord
for _, phone := range phones {
// 避免重复发送:判断该手机号是否已存在记录
var count int64
db.Model(&bus_models.SmsSendRecord{}).
Where("batch_id = ? AND phone = ?", batch.BatchID, phone).
Count(&count)
if count > 0 {
continue
}
// 实际发送短信
success := sendSms(phone, batch.SmsContent)
status := 1 // 成功
if !success {
status = 2 // 失败
}
record := bus_models.SmsSendRecord{
CooperativeNumber: batch.CooperativeNumber,
CooperativeName: batch.CooperativeName,
TaskID: batch.TaskID,
TaskBatchID: batch.ID,
Phone: phone,
BatchID: batch.BatchID,
SmsContent: batch.SmsContent,
Status: status,
}
recordsToInsert = append(recordsToInsert, record)
log.Printf("📨 发送短信:%s状态=%d\n", phone, status)
}
// 批量插入发送记录
if len(recordsToInsert) > 0 {
if err := db.Create(&recordsToInsert).Error; err != nil {
log.Printf("❌ 批量写入发送记录失败:%v\n", err)
return err
}
}
return nil
}
func updateTaskStatusIfFinished(taskID uint64) {
var pendingCount int64
db.Model(&bus_models.SmsTaskBatch{}).
Where("task_id = ? AND status = 0", taskID).
Count(&pendingCount)
if pendingCount > 0 {
return // 尚有批次未完成,直接返回
}
// 统计该任务所有失败短信数status = 2
var failCount int64
db.Model(&bus_models.SmsSendRecord{}).
Where("task_id = ? AND status = 2", taskID).
Count(&failCount)
// 更新任务状态为已完成,并记录失败条数
db.Model(&bus_models.SmsTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": 2,
"channel_fail_count": failCount,
})
// 查询任务详细信息
var task bus_models.SmsTask
if err := db.Model(&bus_models.SmsTask{}).Where("id = ?", taskID).First(&task).Error; err != nil {
log.Printf("查询SmsTask失败, taskID=%d, err=%v", taskID, err)
return
}
// 查询合作商信息
var coop bus_models.BusCooperative
if err := db.Model(&bus_models.BusCooperative{}).Where("cooperative_number = ?", task.CooperativeNumber).First(&coop).Error; err != nil {
log.Printf("查询合作商失败, CooperativeNumber=%s, err=%v", task.CooperativeNumber, err)
return
}
// 查询合作商产品信息
var coopProduct []bus_models.BusCooperativeProduct
if err := db.Where("cooperative_id = ?", task.CooperativeNumber).Find(&coopProduct).Error; err != nil {
log.Printf("查询合作商产品失败: %v", err)
return
}
productFlag := false
var productPrice float64
for _, item := range coopProduct {
// 查询产品信息
var product bus_models.BusProduct
if err := db.Where("id = ?", item.ProductID).First(&product).Error; err != nil {
log.Printf("查询产品失败: %v", err)
return
}
// 只处理短信产品 (ProductType=1)
if product.ProductType != 1 {
continue
} else {
productPrice = product.Price * product.Discount
productFlag = true
}
}
if !productFlag {
log.Printf("未配置短信产品")
return
}
// 记录短信消耗情况
// 计算总消耗金额
totalAmount := float64(task.TotalSmsCount) * productPrice
if totalAmount > 0 {
// ⚡ 处理扣款逻辑
err := db.Transaction(func(tx *gorm.DB) error {
// 重新锁定合作商数据
var lockedCoop bus_models.BusCooperative
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("cooperative_number = ?", task.CooperativeNumber).
First(&lockedCoop).Error; err != nil {
return fmt.Errorf("锁定合作商失败: %w", err)
}
remain := totalAmount
updateFields := make(map[string]interface{})
// 先扣余额
if lockedCoop.Balance >= remain {
updateFields["balance"] = lockedCoop.Balance - remain
remain = 0
} else {
remain -= lockedCoop.Balance
updateFields["balance"] = 0.0
}
// 余额不够,再扣赠送余额
if remain > 0 {
if lockedCoop.Free >= remain {
updateFields["free"] = lockedCoop.Free - remain
remain = 0
} else {
updateFields["free"] = 0.0
remain -= lockedCoop.Free
}
}
// 如果到这里 remain > 0说明账户和赠送余额都不够
if remain > 0 {
log.Printf("⚠️ 合作商账户和赠送余额均不足CooperativeNumber=%s, 欠费金额=%.3f", task.CooperativeNumber, remain)
// 你可以根据需要抛错或者继续执行
return fmt.Errorf("账户余额不足")
}
// 更新合作商账户
if err := tx.Model(&bus_models.BusCooperative{}).
Where("id = ?", lockedCoop.ID).
Updates(updateFields).Error; err != nil {
return fmt.Errorf("更新合作商账户失败: %w", err)
}
// 记录消耗日志
if err := bus_models.RecordCooperativeConsumptionLog(
tx,
task.CooperativeNumber,
1, // 消耗类型:短信发送
task.ID,
totalAmount,
fmt.Sprintf("短信发送任务任务ID:%d", task.ID),
fmt.Sprintf("总发送条数:%d, 失败条数:%d, 实际计费条数:%d, 单价:%.3f元/条", task.TotalSmsCount,
task.ChannelFailCount, task.TotalSmsCount, productPrice),
); err != nil {
return fmt.Errorf("记录合作商消耗日志失败: %w", err)
}
return nil
})
if err != nil {
log.Printf("更新合作商扣款失败: %v", err)
return
}
}
log.Printf("🎉 任务 task_id=%d 已全部完成!失败短信总数=%d\n", taskID, failCount)
}
// 模拟发送逻辑(生产环境接通短信通道)
func sendSms(phone, content string) bool {
//// 可接入真实短信发送SDK
//time.Sleep(10 * time.Millisecond)
//return rand.Intn(4) != 0 // 25% 失败
var phoneList []string
phoneList = append(phoneList, phone)
err := sms.GtSendMessage(phoneList, content)
if err != nil {
log.Printf("GtSendMessage err: %v", err)
return false
}
return true
}