telco_server/common/sms_scheduler/scheduler.go

190 lines
4.5 KiB
Go
Raw Normal View History

package sms_scheduler
import (
"fmt"
"go-admin/app/admin/models/bus_models"
"go-admin/common/database"
"go-admin/common/redisx"
"golang.org/x/net/context"
"log"
"math/rand"
"sync"
"time"
"gorm.io/gorm"
)
var db *gorm.DB // 需要先初始化你的 GORM DB
const (
maxRetry = 3
workerPoolSize = 5
)
func ProcessPendingBatches() {
log.Println("📡 [Scheduler] 扫描待发送批次...")
db = database.Db
batches := fetchPendingBatches()
if len(batches) == 0 {
log.Println(" 无待发批次")
return
}
wg := sync.WaitGroup{}
sem := make(chan struct{}, workerPoolSize)
for _, batch := range batches {
wg.Add(1)
sem <- struct{}{} // 限制并发
go func(batch bus_models.SmsTaskBatch) {
defer wg.Done()
defer func() { <-sem }()
err := lockAndProcessBatch(batch)
if err != nil {
log.Printf("❌ 批次处理失败: batch_id=%s, err=%v\n", batch.BatchID, err)
}
}(batch)
}
wg.Wait()
}
func fetchPendingBatches() []bus_models.SmsTaskBatch {
var batches []bus_models.SmsTaskBatch
db.Where("status = ? AND (schedule_time IS NULL OR schedule_time <= ?)", 0, time.Now()).
Limit(10).
Find(&batches)
return batches
}
func lockAndProcessBatch(batch bus_models.SmsTaskBatch) error {
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
// 锁定批次
res := tx.Model(&bus_models.SmsTaskBatch{}).
Where("id = ? AND status = 0", batch.ID).
Update("status", 1) // 标记发送中
if res.RowsAffected == 0 {
tx.Rollback()
return nil // 已被其他任务抢占
}
tx.Commit()
log.Printf("🚀 开始处理批次batch_id=%s\n", batch.BatchID)
err := processBatchPhones(batch)
if err != nil {
// 更新失败状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 3)
return err
}
// 更新成功状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 1)
updateTaskStatusIfFinished(batch.TaskID)
return nil
}
func processBatchPhones(batch bus_models.SmsTaskBatch) error {
ctx := context.Background()
importID := batch.ImportID
num := batch.Num
key := fmt.Sprintf("sms:import:%s:%d", importID, num)
phones, err := redisx.Client.LRange(ctx, key, 0, -1).Result()
if err != nil {
log.Printf("❌ 获取手机号缓存失败:%v\n", err)
return err
}
if len(phones) == 0 {
log.Printf("⚠️ 无手机号可处理import_id=%s, num=%d\n", importID, num)
return nil
}
var recordsToInsert []bus_models.SmsSendRecord
for _, phone := range phones {
// 避免重复发送:判断该手机号是否已存在记录
var count int64
db.Model(&bus_models.SmsSendRecord{}).
Where("batch_id = ? AND phone = ?", batch.BatchID, phone).
Count(&count)
if count > 0 {
continue
}
// 实际发送短信
success := sendSms(phone, batch.SmsContent)
status := 1 // 成功
if !success {
status = 2 // 失败
}
record := bus_models.SmsSendRecord{
CooperativeNumber: batch.CooperativeNumber,
CooperativeName: batch.CooperativeName,
TaskID: batch.TaskID,
TaskBatchID: batch.ID,
Phone: phone,
BatchID: batch.BatchID,
SmsContent: batch.SmsContent,
Status: status,
}
recordsToInsert = append(recordsToInsert, record)
log.Printf("📨 发送短信:%s状态=%d\n", phone, status)
}
// 批量插入发送记录
if len(recordsToInsert) > 0 {
if err := db.Create(&recordsToInsert).Error; err != nil {
log.Printf("❌ 批量写入发送记录失败:%v\n", err)
return err
}
}
return nil
}
func updateTaskStatusIfFinished(taskID uint64) {
var pendingCount int64
db.Model(&bus_models.SmsTaskBatch{}).
Where("task_id = ? AND status = 0", taskID).
Count(&pendingCount)
if pendingCount > 0 {
return // 尚有批次未完成,直接返回
}
// 统计该任务所有失败短信数status = 2
var failCount int64
db.Model(&bus_models.SmsSendRecord{}).
Where("task_id = ? AND status = 2", taskID).
Count(&failCount)
// 更新任务状态为已完成,并记录失败条数
db.Model(&bus_models.SmsTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": 2,
"channel_fail_count": failCount,
})
log.Printf("🎉 任务 task_id=%d 已全部完成!失败短信总数=%d\n", taskID, failCount)
}
// 模拟发送逻辑(生产环境接通短信通道)
func sendSms(phone, content string) bool {
// 可接入真实短信发送SDK
time.Sleep(10 * time.Millisecond)
return rand.Intn(4) != 0 // 25% 失败
}