需求
监控deviceAccess数据库migration集合,查询半个小时内"migrationFlag": "failed"的数据,半个小时执行一次。如果有failed的数据发送告警消息。
实现
migration.go
package main
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
dingdingWebhook = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
dingdingSecret = "xxx"
// URL编码密码中的特殊字符 @ -> %40
mongoURI = "mongodb://root:root@1.2.3.4:3717/admin"
beijingTZ = "Asia/Shanghai"
)
type MigrationRecord struct {
DeviceID string `bson:"deviceId"`
ToCompanyID int64 `bson:"toCompanyId"`
ToStoreID int64 `bson:"toStoreId"`
MigrationFlag string `bson:"migrationFlag"`
CreateTime time.Time `bson:"createTime"`
}
type DingMsg struct {
Msgtype string `json:"msgtype"`
Markdown struct {
Title string `json:"title"`
Text string `json:"text"`
} `json:"markdown"`
}
func main() {
// 加载北京时区
beijingLoc, err := time.LoadLocation(beijingTZ)
if err != nil {
fmt.Printf("无法加载时区 '%s': %v\n", beijingTZ, err)
os.Exit(1)
}
// 记录开始时间(北京时间)
startTime := time.Now().In(beijingLoc)
fmt.Printf("[%s] 开始检查迁移失败记录\n", startTime.Format("2006-01-02 15:04:05"))
// 设置时间范围:最近30分钟(使用UTC时间)
// 数据库中的时间是UTC,所以阈值也要用UTC时间
utcThreshold := time.Now().Add(-30 * time.Minute).UTC()
// 检查迁移失败记录
failures, err := checkRecentMigrationFailures(utcThreshold)
if err != nil {
fmt.Printf("错误: %v\n", err)
os.Exit(1)
}
// 如果有失败记录,发送钉钉通知
if len(failures) > 0 {
msg := buildDingMessage(failures, utcThreshold, beijingLoc)
if err := sendHuachaMessage(msg); err != nil {
fmt.Printf("发送钉钉消息失败: %v\n", err)
os.Exit(1)
}
}
// 记录结束时间(北京时间)
endTime := time.Now().In(beijingLoc)
duration := endTime.Sub(startTime)
fmt.Printf("[%s] 检查完成, 耗时: %v, 发现失败记录: %d\n",
endTime.Format("2006-01-02 15:04:05"),
duration,
len(failures))
}
func checkRecentMigrationFailures(threshold time.Time) ([]MigrationRecord, error) {
// 连接MongoDB
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
if err != nil {
return nil, fmt.Errorf("连接MongoDB失败: %w", err)
}
defer func() {
if err := client.Disconnect(context.Background()); err != nil {
fmt.Printf("关闭MongoDB连接失败: %v\n", err)
}
}()
db := client.Database("deviceAccess")
collection := db.Collection("migration")
// 构建查询条件:最近30分钟内迁移失败的记录
filter := bson.M{
"migrationFlag": "failed",
"$or": []bson.M{
{"createdTime": bson.M{"$gte": threshold}},
},
}
fmt.Printf("查询条件: 迁移失败且更新时间在 %s 之后\n", threshold.Format("2006-01-02 15:04:05"))
cur, err := collection.Find(context.Background(), filter)
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer cur.Close(context.Background())
var failures []MigrationRecord
for cur.Next(context.Background()) {
var record MigrationRecord
if err := cur.Decode(&record); err != nil {
fmt.Printf("解析记录失败: %v\n", err)
continue
}
failures = append(failures, record)
}
if err := cur.Err(); err != nil {
return nil, fmt.Errorf("游标错误: %w", err)
}
return failures, nil
}
func buildDingMessage(records []MigrationRecord, utcThreshold time.Time, beijingLoc *time.Location) string {
var builder strings.Builder
// 将UTC时间转换为北京时间
beijingThreshold := utcThreshold.In(beijingLoc)
beijingNow := time.Now().In(beijingLoc)
builder.WriteString("### 迁移失败告警 (最近30分钟)\n")
builder.WriteString(fmt.Sprintf("**检测时间(北京时间)**: %s\n", beijingNow.Format("2006-01-02 15:04:05")))
builder.WriteString(fmt.Sprintf("**时间范围**: %s (北京时间) 之后\n", beijingThreshold.Format("2006-01-02 15:04:05")))
builder.WriteString(fmt.Sprintf("**失败数量**: %d\n\n", len(records)))
builder.WriteString("#### 失败设备列表:\n")
for i, r := range records {
builder.WriteString(fmt.Sprintf("%d. DeviceID: `%s`\n", i+1, r.DeviceID))
builder.WriteString(fmt.Sprintf(" ToCompanyID: %d\n", r.ToCompanyID))
builder.WriteString(fmt.Sprintf(" ToStoreID: %d\n", r.ToStoreID))
// 显示更新时间(如果可用)
if !r.CreateTime.IsZero() {
beijingCreated := r.CreateTime.In(beijingLoc)
builder.WriteString(fmt.Sprintf(" - 创建时间: %s (北京时间)\n\n", beijingCreated.Format("2006-01-02 15:04:05")))
} else {
builder.WriteString("时间: 未知\n\n")
}
}
builder.WriteString("> 请及时处理!")
return builder.String()
}
func sendHuachaMessage(message string) error {
timestamp := time.Now().UnixNano() / 1e6
sign := generateSign(timestamp)
// 构建完整的webhook URL
webhookURL := fmt.Sprintf("%s×tamp=%d&sign=%s",
dingdingWebhook,
timestamp,
url.QueryEscape(sign))
msg := DingMsg{
Msgtype: "markdown",
}
msg.Markdown.Title = "设备迁移失败告警 (最近30分钟)"
msg.Markdown.Text = message
jsonData, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("JSON编码失败: %w", err)
}
resp, err := http.Post(webhookURL, "application/json", strings.NewReader(string(jsonData)))
if err != nil {
return fmt.Errorf("HTTP请求失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("钉钉返回错误状态: %d, 响应: %s", resp.StatusCode, string(body))
}
return nil
}
func generateSign(timestamp int64) string {
stringToSign := fmt.Sprintf("%d\n%s", timestamp, dingdingSecret)
h := hmac.New(sha256.New, []byte(dingdingSecret))
h.Write([]byte(stringToSign))
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
运行
go build -o migration_monitor migration.go
#添加定时任务
*/10 * * * * /root/go_script/migration_monitor >> /root/go_script/migration_monitor.log 2>&1
