telco_server/common/sms_scheduler/scheduler.go

190 lines
4.5 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package sms_scheduler
import (
"fmt"
"go-admin/app/admin/models/bus_models"
"go-admin/common/database"
"go-admin/common/redisx"
"golang.org/x/net/context"
"log"
"math/rand"
"sync"
"time"
"gorm.io/gorm"
)
var db *gorm.DB // 需要先初始化你的 GORM DB
const (
maxRetry = 3
workerPoolSize = 5
)
func ProcessPendingBatches() {
log.Println("📡 [Scheduler] 扫描待发送批次...")
db = database.Db
batches := fetchPendingBatches()
if len(batches) == 0 {
log.Println(" 无待发批次")
return
}
wg := sync.WaitGroup{}
sem := make(chan struct{}, workerPoolSize)
for _, batch := range batches {
wg.Add(1)
sem <- struct{}{} // 限制并发
go func(batch bus_models.SmsTaskBatch) {
defer wg.Done()
defer func() { <-sem }()
err := lockAndProcessBatch(batch)
if err != nil {
log.Printf("❌ 批次处理失败: batch_id=%s, err=%v\n", batch.BatchID, err)
}
}(batch)
}
wg.Wait()
}
func fetchPendingBatches() []bus_models.SmsTaskBatch {
var batches []bus_models.SmsTaskBatch
db.Where("status = ? AND (schedule_time IS NULL OR schedule_time <= ?)", 0, time.Now()).
Limit(10).
Find(&batches)
return batches
}
func lockAndProcessBatch(batch bus_models.SmsTaskBatch) error {
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
// 锁定批次
res := tx.Model(&bus_models.SmsTaskBatch{}).
Where("id = ? AND status = 0", batch.ID).
Update("status", 1) // 标记发送中
if res.RowsAffected == 0 {
tx.Rollback()
return nil // 已被其他任务抢占
}
tx.Commit()
log.Printf("🚀 开始处理批次batch_id=%s\n", batch.BatchID)
err := processBatchPhones(batch)
if err != nil {
// 更新失败状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 3)
return err
}
// 更新成功状态
db.Model(&bus_models.SmsTaskBatch{}).Where("id = ?", batch.ID).Update("status", 1)
updateTaskStatusIfFinished(batch.TaskID)
return nil
}
func processBatchPhones(batch bus_models.SmsTaskBatch) error {
ctx := context.Background()
importID := batch.ImportID
num := batch.Num
key := fmt.Sprintf("sms:import:%s:%d", importID, num)
phones, err := redisx.Client.LRange(ctx, key, 0, -1).Result()
if err != nil {
log.Printf("❌ 获取手机号缓存失败:%v\n", err)
return err
}
if len(phones) == 0 {
log.Printf("⚠️ 无手机号可处理import_id=%s, num=%d\n", importID, num)
return nil
}
var recordsToInsert []bus_models.SmsSendRecord
for _, phone := range phones {
// 避免重复发送:判断该手机号是否已存在记录
var count int64
db.Model(&bus_models.SmsSendRecord{}).
Where("batch_id = ? AND phone = ?", batch.BatchID, phone).
Count(&count)
if count > 0 {
continue
}
// 实际发送短信
success := sendSms(phone, batch.SmsContent)
status := 1 // 成功
if !success {
status = 2 // 失败
}
record := bus_models.SmsSendRecord{
CooperativeNumber: batch.CooperativeNumber,
CooperativeName: batch.CooperativeName,
TaskID: batch.TaskID,
TaskBatchID: batch.ID,
Phone: phone,
BatchID: batch.BatchID,
SmsContent: batch.SmsContent,
Status: status,
}
recordsToInsert = append(recordsToInsert, record)
log.Printf("📨 发送短信:%s状态=%d\n", phone, status)
}
// 批量插入发送记录
if len(recordsToInsert) > 0 {
if err := db.Create(&recordsToInsert).Error; err != nil {
log.Printf("❌ 批量写入发送记录失败:%v\n", err)
return err
}
}
return nil
}
func updateTaskStatusIfFinished(taskID uint64) {
var pendingCount int64
db.Model(&bus_models.SmsTaskBatch{}).
Where("task_id = ? AND status = 0", taskID).
Count(&pendingCount)
if pendingCount > 0 {
return // 尚有批次未完成,直接返回
}
// 统计该任务所有失败短信数status = 2
var failCount int64
db.Model(&bus_models.SmsSendRecord{}).
Where("task_id = ? AND status = 2", taskID).
Count(&failCount)
// 更新任务状态为已完成,并记录失败条数
db.Model(&bus_models.SmsTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": 2,
"channel_fail_count": failCount,
})
log.Printf("🎉 任务 task_id=%d 已全部完成!失败短信总数=%d\n", taskID, failCount)
}
// 模拟发送逻辑(生产环境接通短信通道)
func sendSms(phone, content string) bool {
// 可接入真实短信发送SDK
time.Sleep(10 * time.Millisecond)
return rand.Intn(4) != 0 // 25% 失败
}