监控mongo集合中的某个字段

需求

监控deviceAccess数据库migration集合,查询半个小时内"migrationFlag": "failed"的数据,半个小时执行一次。如果有failed的数据发送告警消息。

实现

migration.go

package main

import (
        "context"
        "crypto/hmac"
        "crypto/sha256"
        "encoding/base64"
        "encoding/json"
        "fmt"
        "io"
        "net/http"
        "net/url"
        "os"
        "strings"
        "time"

        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
)

const (
        dingdingWebhook = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
        dingdingSecret  = "xxx"
        // URL编码密码中的特殊字符 @ -> %40
        mongoURI  = "mongodb://root:root@1.2.3.4:3717/admin"
        beijingTZ = "Asia/Shanghai"
)

type MigrationRecord struct {
        DeviceID      string    `bson:"deviceId"`
        ToCompanyID   int64     `bson:"toCompanyId"`
        ToStoreID     int64     `bson:"toStoreId"`
        MigrationFlag string    `bson:"migrationFlag"`
        CreateTime    time.Time `bson:"createTime"`
}

type DingMsg struct {
        Msgtype  string `json:"msgtype"`
        Markdown struct {
                Title string `json:"title"`
                Text  string `json:"text"`
        } `json:"markdown"`
}

func main() {
        // 加载北京时区
        beijingLoc, err := time.LoadLocation(beijingTZ)
        if err != nil {
                fmt.Printf("无法加载时区 '%s': %v\n", beijingTZ, err)
                os.Exit(1)
        }

        // 记录开始时间(北京时间)
        startTime := time.Now().In(beijingLoc)
        fmt.Printf("[%s] 开始检查迁移失败记录\n", startTime.Format("2006-01-02 15:04:05"))

        // 设置时间范围:最近30分钟(使用UTC时间)
        // 数据库中的时间是UTC,所以阈值也要用UTC时间
        utcThreshold := time.Now().Add(-30 * time.Minute).UTC()

        // 检查迁移失败记录
        failures, err := checkRecentMigrationFailures(utcThreshold)
        if err != nil {
                fmt.Printf("错误: %v\n", err)
                os.Exit(1)
        }

        // 如果有失败记录,发送钉钉通知
        if len(failures) > 0 {
                msg := buildDingMessage(failures, utcThreshold, beijingLoc)
                if err := sendHuachaMessage(msg); err != nil {
                        fmt.Printf("发送钉钉消息失败: %v\n", err)
                        os.Exit(1)
                }
        }

        // 记录结束时间(北京时间)
        endTime := time.Now().In(beijingLoc)
        duration := endTime.Sub(startTime)
        fmt.Printf("[%s] 检查完成, 耗时: %v, 发现失败记录: %d\n",
                endTime.Format("2006-01-02 15:04:05"),
                duration,
                len(failures))
}

func checkRecentMigrationFailures(threshold time.Time) ([]MigrationRecord, error) {
        // 连接MongoDB
        client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
        if err != nil {
                return nil, fmt.Errorf("连接MongoDB失败: %w", err)
        }
        defer func() {
                if err := client.Disconnect(context.Background()); err != nil {
                        fmt.Printf("关闭MongoDB连接失败: %v\n", err)
                }
        }()

        db := client.Database("deviceAccess")
        collection := db.Collection("migration")

        // 构建查询条件:最近30分钟内迁移失败的记录
        filter := bson.M{
                "migrationFlag": "failed",
                "$or": []bson.M{
                        {"createdTime": bson.M{"$gte": threshold}},
                },
        }

        fmt.Printf("查询条件: 迁移失败且更新时间在 %s 之后\n", threshold.Format("2006-01-02 15:04:05"))

        cur, err := collection.Find(context.Background(), filter)
        if err != nil {
                return nil, fmt.Errorf("查询失败: %w", err)
        }
        defer cur.Close(context.Background())

        var failures []MigrationRecord
        for cur.Next(context.Background()) {
                var record MigrationRecord
                if err := cur.Decode(&record); err != nil {
                        fmt.Printf("解析记录失败: %v\n", err)
                        continue
                }
                failures = append(failures, record)
        }

        if err := cur.Err(); err != nil {
                return nil, fmt.Errorf("游标错误: %w", err)
        }

        return failures, nil
}

func buildDingMessage(records []MigrationRecord, utcThreshold time.Time, beijingLoc *time.Location) string {
        var builder strings.Builder

        // 将UTC时间转换为北京时间
        beijingThreshold := utcThreshold.In(beijingLoc)
        beijingNow := time.Now().In(beijingLoc)

        builder.WriteString("### 迁移失败告警 (最近30分钟)\n")
        builder.WriteString(fmt.Sprintf("**检测时间(北京时间)**: %s\n", beijingNow.Format("2006-01-02 15:04:05")))
        builder.WriteString(fmt.Sprintf("**时间范围**: %s (北京时间) 之后\n", beijingThreshold.Format("2006-01-02 15:04:05")))
        builder.WriteString(fmt.Sprintf("**失败数量**: %d\n\n", len(records)))
        builder.WriteString("#### 失败设备列表:\n")

        for i, r := range records {
                builder.WriteString(fmt.Sprintf("%d. DeviceID: `%s`\n", i+1, r.DeviceID))
                builder.WriteString(fmt.Sprintf("   ToCompanyID: %d\n", r.ToCompanyID))
                builder.WriteString(fmt.Sprintf("   ToStoreID: %d\n", r.ToStoreID))

                // 显示更新时间(如果可用)
                if !r.CreateTime.IsZero() {
                        beijingCreated := r.CreateTime.In(beijingLoc)
                        builder.WriteString(fmt.Sprintf("   - 创建时间: %s (北京时间)\n\n", beijingCreated.Format("2006-01-02 15:04:05")))
                } else {
                        builder.WriteString("时间: 未知\n\n")
                }
        }

        builder.WriteString("> 请及时处理!")
        return builder.String()
}

func sendHuachaMessage(message string) error {
        timestamp := time.Now().UnixNano() / 1e6
        sign := generateSign(timestamp)

        // 构建完整的webhook URL
        webhookURL := fmt.Sprintf("%s&timestamp=%d&sign=%s",
                dingdingWebhook,
                timestamp,
                url.QueryEscape(sign))

        msg := DingMsg{
                Msgtype: "markdown",
        }
        msg.Markdown.Title = "设备迁移失败告警 (最近30分钟)"
        msg.Markdown.Text = message

        jsonData, err := json.Marshal(msg)
        if err != nil {
                return fmt.Errorf("JSON编码失败: %w", err)
        }

        resp, err := http.Post(webhookURL, "application/json", strings.NewReader(string(jsonData)))
        if err != nil {
                return fmt.Errorf("HTTP请求失败: %w", err)
        }
        defer resp.Body.Close()

        if resp.StatusCode != http.StatusOK {
                body, _ := io.ReadAll(resp.Body)
                return fmt.Errorf("钉钉返回错误状态: %d, 响应: %s", resp.StatusCode, string(body))
        }

        return nil
}

func generateSign(timestamp int64) string {
        stringToSign := fmt.Sprintf("%d\n%s", timestamp, dingdingSecret)
        h := hmac.New(sha256.New, []byte(dingdingSecret))
        h.Write([]byte(stringToSign))
        return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

运行

go build -o migration_monitor migration.go
#添加定时任务
*/10 * * * * /root/go_script/migration_monitor >> /root/go_script/migration_monitor.log 2>&1

file

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x