package bus_service import ( "bytes" "fmt" "github.com/go-admin-team/go-admin-core/sdk/service" "github.com/xuri/excelize/v2" "go-admin/app/admin/models/bus_models" "go-admin/common/redisx" "golang.org/x/net/context" "gorm.io/gorm" "math/rand" "strconv" "time" ) const ( maxPhonesPerShard = 10000 cacheExpire = time.Hour ) type SmsService struct { service.Service } // ReadExcelFile 读取 Excel 文件并提取第一列的手机号 func (s *SmsService) ReadExcelFile(file []byte) ([]string, error) { // 使用 excelize.OpenReader 直接从文件流读取内容 f, err := excelize.OpenReader(bytes.NewReader(file)) if err != nil { return nil, fmt.Errorf("无法打开 Excel 文件: %v", err) } var phones []string // 获取所有工作表列表 sheetList := f.GetSheetList() if len(sheetList) == 0 { return nil, fmt.Errorf("excel 文件中没有工作表") } // 选择第一个工作表 sheet := sheetList[0] rows, err := f.GetRows(sheet) if err != nil { return nil, fmt.Errorf("读取 Excel 行失败: %v", err) } // 假设第一列是手机号 for _, row := range rows { if len(row) > 0 { phone := row[0] phones = append(phones, phone) } } return phones, nil } func (s *SmsService) CachePhonesByShard(importSerial string, phones []string) error { ctx := context.Background() total := 0 for i := 0; i < len(phones); i += maxPhonesPerShard { end := i + maxPhonesPerShard if end > len(phones) { end = len(phones) } shard := phones[i:end] total++ key := fmt.Sprintf("sms:import:%s:%d", importSerial, total) if err := redisx.Client.RPush(ctx, key, shard).Err(); err != nil { return err } redisx.Client.Expire(ctx, key, cacheExpire) } // 保存总分片数量 totalKey := fmt.Sprintf("sms:import:%s:total", importSerial) err := redisx.Client.Set(ctx, totalKey, total, cacheExpire).Err() return err } func (s *SmsService) AppendPhonesToRedis(serial string, phones []string) error { ctx := context.Background() // 拿当前最大分片号 totalKey := fmt.Sprintf("sms:import:%s:total", serial) totalStr, err := redisx.Client.Get(ctx, totalKey).Result() if err != nil { return fmt.Errorf("无法读取缓存分片: %v", err) } total, _ := strconv.Atoi(totalStr) if total == 0 { return fmt.Errorf("缓存分片不存在") } // 直接往最后一片中添加(你也可以按量分片再扩展) lastKey := fmt.Sprintf("sms:import:%s:%d", serial, total) values := make([]interface{}, len(phones)) for i, p := range phones { values[i] = p } err = redisx.Client.RPush(ctx, lastKey, values...).Err() return err } func (s *SmsService) SubmitSmsTaskFromRedis(ctx context.Context, serial string, content string) error { // 读取分片数量 totalKey := fmt.Sprintf("sms:import:%s:total", serial) totalStr, err := redisx.Client.Get(ctx, totalKey).Result() if err != nil { return fmt.Errorf("获取缓存分片数失败: %v", err) } total, _ := strconv.Atoi(totalStr) if total == 0 { return fmt.Errorf("分片数为 0,无法创建任务") } // 统计总手机号数量 var totalPhones int phoneCounts := make([]int, total) for i := 1; i <= total; i++ { redisKey := fmt.Sprintf("sms:import:%s:%d", serial, i) count, err := redisx.Client.LLen(ctx, redisKey).Result() if err != nil { return fmt.Errorf("读取缓存分片 %d 失败: %v", i, err) } totalPhones += int(count) phoneCounts[i-1] = int(count) } if totalPhones == 0 { return fmt.Errorf("手机号为空,不能创建任务") } // 计算短信条数(按70字分割) // 短信条数计算:70字以内算1条,超过则每67字拆1条(长短信按67字分段) contentCost := (len([]rune(content)) + 66) / 67 totalSmsCount := totalPhones * contentCost // 使用数据库事务 return s.Orm.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // 生成批次ID batchID, _ := GenerateBatchID(s.Orm) // 插入任务 task := &bus_models.SmsTask{ CooperativeNumber: "", // 可根据需要填充 CooperativeName: "", BatchID: batchID, ImportID: serial, SmsContent: content, SmsContentCost: contentCost, TotalPhoneCount: totalPhones, TotalSmsCount: totalSmsCount, Status: 0, InterceptFailCount: 0, ChannelFailCount: 0, } if err := tx.Create(task).Error; err != nil { return fmt.Errorf("创建短信任务失败: %v", err) } // 插入批次记录 for i := 1; i <= total; i++ { batch := &bus_models.SmsTaskBatch{ TaskID: task.ID, BatchID: batchID, ImportID: serial, Num: i, PhoneCount: phoneCounts[i-1], SmsCount: phoneCounts[i-1] * contentCost, Status: 0, } if err := tx.Create(batch).Error; err != nil { return fmt.Errorf("创建批次失败: %v", err) } } // ✅ 提示:不在这里生成 SmsSendRecord,后续通过消费者从 Redis 中读取批次生成 // 否则数据量大时会影响事务和响应速度 return nil }) } // GenerateBatchID 生成唯一的批次ID:日期(YYYYMMDD)+ 8位随机数(共16位) func GenerateBatchID(db *gorm.DB) (string, error) { // 获取当前日期(年月日) dateStr := time.Now().Format("20060102") // 例如:20250411 // 生成一个8位随机数(范围:00000000 ~ 99999999) rand.Seed(time.Now().UnixNano()) randomNum := rand.Int63n(100000000) // 8位最大是1亿,即10^8 // 格式化为8位字符串(左侧补0) randomNumStr := fmt.Sprintf("%08d", randomNum) // 拼接成 batch_id batchID := fmt.Sprintf("%s%s", dateStr, randomNumStr) // 检查 sms_task 表中是否已存在该 batch_id var count int64 err := db.Table("sms_task").Where("batch_id = ?", batchID).Count(&count).Error if err != nil { return "", err } // 如果已存在,则递归重试 if count > 0 { return GenerateBatchID(db) } return batchID, nil }