monstache监控告警自愈

背景

我的环境里使用了monstache来进行mongodb和es的同步,但是monstache有时候会假死,这就会导致pod状态正常,但实际上同步已经停止了,日志又看不到,所以需要给monstache加一个监控,当monstache发生假死时发送告警消息,然后自动删除假死的pod,达到自愈的效果。

官方文档:https://rwynn.github.io/monstache-site/config

仓库地址:https://github.com/rwynn/monstache

使用文档:https://help.aliyun.com/zh/es/use-cases/use-monstache-to-synchronize-data-from-mongodb-to-alibaba-cloud-elasticsearch-in-real-time#context-bcj-kq2-lb3

实现

1.开启http-server

首先修改配置文件config.toml,k8s内即monstache-config的configmap。

enable-http-server=true

2.开启日志

[logs]
info = "/var/log/es/info.log"
warn = "/var/log/es/wran.log"
error = "/var/log/es/error.log"
trace = "/var/log/es/trace.log"

只配置这个不生效,需要配置MONSTACHE_LOG_DIR环境变量。

file

env:MONSTACHE_LOG_DIR=/var/log/es

file

重启pod,在/var/log/es目录中就会有info.log,error.log,stats.log,warn.log。

3.配置服务

apiVersion: v1
kind: Service
metadata:
  name: monstache
  namespace: auto
spec:
  ports:
  - name: monstache
    port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app: monstache
  type: ClusterIP

file

通过判断Succeeded数值在2分钟内是否发生变化来判断monstache服务是否发生了假死。,如果发生了假死发送消息通知到花茶,并删除假死的pod。

4.脚本实现

monstache.go,修改MongoURI和钉钉DingdingWebhook,Secret,还有CheckHealth和sendAlert函数中的手机号。

package main

import (
        "bytes"
        "context"
        "crypto/hmac"
        "crypto/sha256"
        "encoding/base64"
        "encoding/json"
        "fmt"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
        "io"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/clientcmd"
        "log"
        "net/http"
        "net/url"
        "os"
        "strconv"
        "sync"
        "time"

        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// 配置项(建议通过环境变量注入)
var (
        MongoURI        = "mongodb://root:root@1.2.3.4:3717/"
        MongoDBName     = "monstache"
        MongoCollection = "cluster"
        DingdingWebhook = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
        Secret          = "xxx" // 钉钉加签密钥
        CheckInterval   = 10 * time.Minute                                                      // 检查间隔
        StuckThreshold  = 3 * time.Minute                                                       // 假死判定阈值
        PrometheusPort  = ":6666"                                                               // 指标暴露端口
        mongoClient     *mongo.Client
        k8sClient       *kubernetes.Clientset
        // 健康状态metrics指标(1=正常,0=异常)
        healthStatusGauge = promauto.NewGauge(prometheus.GaugeOpts{
                Name: "monstache_health_status",
                Help: "Monstache health status (1=healthy, 0=unhealthy)",
        })
)

type Monitor struct {
        mu           sync.RWMutex
        currentPodIP string
        lastValue    int64
        lastChange   time.Time
        currentAlarm bool
        lastIPUpdate time.Time
}

type MonstacheStats struct {
        Succeeded int64 `json:"Succeeded"`
}

// 配置日志
func setupLogging() (*os.File, error) {
        logFile, err := os.OpenFile("monstache.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
                return nil, fmt.Errorf("无法打开日志文件: %w", err)
        }
        log.SetOutput(logFile)
        log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
        return logFile, nil
}

func initMongoDB() error {
        client, err := mongo.Connect(context.Background(),
                options.Client().ApplyURI(MongoURI).
                        SetConnectTimeout(10*time.Second).
                        SetServerSelectionTimeout(30*time.Second))
        if err != nil {
                return err
        }

        // 验证连接
        if err := client.Ping(context.Background(), nil); err != nil {
                return fmt.Errorf("MongoDB 连接测试失败: %w", err)
        }
        mongoClient = client
        fmt.Println("MongoDB连接成功")
        return nil
}

func initK8SClient() error {
        var kubeConfig = "/root/.kube/config"
        // 初始化 Kubernetes 客户端
        config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
        if err != nil {
                return fmt.Errorf("初始化集群配置失败: %w", err)
        }

        client, err := kubernetes.NewForConfig(config)
        if err != nil {
                return fmt.Errorf("创建客户端失败: %w", err)
        }
        k8sClient = client
        fmt.Println("Kubernetes 连接成功")
        return nil
}

// UpdatePodIP Pod IP 管理
func (m *Monitor) UpdatePodIP() error {
        // 1. 从 MongoDB 获取 Pod 名称
        podName, err := m.getPodNameFromMongo()
        if err != nil {
                return fmt.Errorf("获取 Pod 名称失败: %w", err)
        }

        // 2. 从 Kubernetes 获取 Pod IP
        podIP, err := m.getPodIP(podName)
        if err != nil {
                return fmt.Errorf("获取 Pod IP 失败: %w", err)
        }
        fmt.Printf("当前 Pod Name, IP: %s %s\n", podName, podIP)

        // 3. 更新状态
        m.mu.Lock()
        defer m.mu.Unlock()

        if m.currentPodIP != podIP {
                log.Printf("检测到 Pod IP 变更: %s -> %s", m.currentPodIP, podIP)
                fmt.Printf("检测到 Pod IP 变更: %s -> %s\n", m.currentPodIP, podIP)
                m.currentPodIP = podIP
        }
        m.lastIPUpdate = time.Now()
        return nil
}

func (m *Monitor) getPodNameFromMongo() (string, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        collection := mongoClient.Database(MongoDBName).Collection(MongoCollection)
        var result struct {
                Host string `bson:"host"`
        }

        err := collection.FindOne(ctx, bson.M{}).Decode(&result)
        if err != nil {
                return "", fmt.Errorf("查询失败: %w", err)
        }

        if result.Host == "" {
                return "", fmt.Errorf("host 字段为空")
        }
        return result.Host, nil
}

func (m *Monitor) getPodIP(podName string) (string, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        pod, err := k8sClient.CoreV1().Pods("firecloud").Get(ctx, podName, metav1.GetOptions{})
        if err != nil {
                return "", fmt.Errorf("获取 Pod 信息失败: %w", err)
        }

        if pod.Status.PodIP == "" {
                return "", fmt.Errorf("pod 尚未分配 IP")
        }
        return pod.Status.PodIP, nil
}

// 根据 Pod IP 获取 Pod 名称
func (m *Monitor) getPodNameByIP(podIP string) (string, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        // 遍历命名空间中的所有 Pods,找到 IP 匹配的 Pod
        pods, err := k8sClient.CoreV1().Pods("firecloud").List(ctx, metav1.ListOptions{})
        if err != nil {
                return "", fmt.Errorf("获取 Pod 信息失败: %w", err)
        }

        for _, pod := range pods.Items {
                if pod.Status.PodIP == podIP {
                        fmt.Printf("pod name: %v\n", pod.Name)
                        return pod.Name, nil
                }
        }

        return "", fmt.Errorf("未找到对应 IP 的 Pod: %s", podIP)
}

// NewMonitor 修改 Monitor 的构造函数,加载告警状态
func NewMonitor() (*Monitor, error) {
        status, err := loadAlertStatus()
        if err != nil {
                return nil, fmt.Errorf("加载告警状态失败: %w", err)
        }

        return &Monitor{
                currentAlarm: status,
        }, nil
}

// CheckHealth 核心健康检查逻辑
func (m *Monitor) CheckHealth() error {
        m.mu.RLock()
        currentIP := m.currentPodIP
        m.mu.RUnlock()

        if currentIP == "" {
                return fmt.Errorf("当前没有有效的 Pod IP")
        }

        // 构建动态 URL
        monstacheURL := fmt.Sprintf("http://%s:8080/stats", currentIP)

        value, err := getSucceededValue(monstacheURL)
        if err != nil {
                updateHealthStatus(0)
                return fmt.Errorf("获取状态失败: %w", err)
        }

        now := time.Now()
        healthStatus := 1 // 默认为健康

        // 判断逻辑
        if m.lastValue == value {
                // 值未变化,检查持续时间
                if now.Sub(m.lastChange) > StuckThreshold {
                        healthStatus = 0 // 超过阈值标记为不健康
                }
        } else {
                // 值发生变化,更新时间戳
                m.lastValue = value
                m.lastChange = now
        }

        // 更新指标
        updateHealthStatus(healthStatus)
        fmt.Println("当前健康状态:", healthStatus)

        _, err = NewMonitor()
        if err != nil {
                log.Printf("加载告警状态失败: %v", err)
        }

        // 触发告警逻辑
        if healthStatus == 0 && !m.currentAlarm {
                fmt.Println("发送假死告警")
                // 发送假死告警
                sendAlert("monstache 服务假死!Succeeded 值长期未变化", []string{"12345678900", "12345678901"})
                m.currentAlarm = true
                // 保存当前告警状态
                if err := saveAlertStatus(m.currentAlarm); err != nil {
                        log.Printf("保存告警状态失败: %v", err)
                }
                podName, err := m.getPodNameByIP(currentIP)
                if err != nil {
                        log.Printf("根据 IP 获取 Pod 名称失败: %v", err)
                } else {
                        err := m.deletePod(podName)
                        if err != nil {
                                log.Printf("删除 Pod 失败: %v", err)
                        } else {
                                fmt.Printf("Pod %s 已删除", podName)
                                sendAlert("monstache 已重启!请检查是否正常", []string{"12345678900", "12345678901"})
                        }
                }
                fmt.Println("告警状态已保存")
        } else if healthStatus == 1 && m.currentAlarm {
                fmt.Println("发送恢复告警")
                // 发送恢复告警
                sendAlert("monstache 服务恢复!Succeeded 值已更新", []string{"12345678900", "12345678901"})
                m.currentAlarm = false
                // 保存当前告警状态
                if err := saveAlertStatus(m.currentAlarm); err != nil {
                        log.Printf("保存告警状态失败: %v", err)
                }
                fmt.Println("告警状态已保存")
        }

        return nil
}

func (m *Monitor) deletePod(podName string) error {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        err := k8sClient.CoreV1().Pods("firecloud").Delete(ctx, podName, metav1.DeleteOptions{})
        if err != nil {
                return fmt.Errorf("删除 Pod 失败: %w", err)
        }
        fmt.Printf("Pod %s 已删除,Kubernetes 将会自动重启该 Pod\n", podName)
        return nil
}

// 统一更新健康状态指标
func updateHealthStatus(status int) {
        healthStatusGauge.Set(float64(status))
        fmt.Println("更新健康状态指标")
}

func getSucceededValue(url string) (int64, error) {
        resp, err := http.Get(url)
        if err != nil {
                return 0, fmt.Errorf("HTTP request failed: %w", err)
        }
        defer resp.Body.Close()

        if resp.StatusCode != http.StatusOK {
                return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
        }

        body, err := io.ReadAll(resp.Body)
        if err != nil {
                return 0, fmt.Errorf("failed to read response: %w", err)
        }

        var stats MonstacheStats
        if err := json.Unmarshal(body, &stats); err != nil {
                return 0, fmt.Errorf("failed to parse JSON: %w", err)
        }
        fmt.Println("Succeeded:", stats.Succeeded)
        return stats.Succeeded, nil
}

func startMetricsServer() {
        http.Handle("/metrics", promhttp.Handler())
        log.Printf("启动指标服务器在 %s", PrometheusPort)
        if err := http.ListenAndServe(PrometheusPort, nil); err != nil {
                log.Fatalf("指标服务器启动失败: %v", err)
        }
}
func sendAlert(message string, mobiles []string) {
        fmt.Printf("发送花茶告警消息: %s", message)
        newMessage := fmt.Sprintf("[Monstache 状态变更]\n%s\n检测时间: %s",
                message, time.Now().Format("2006-01-02 15:04:05"))

        if err := SendDingdingAlert(newMessage, []string{"12345678900", "12345678901"}); err != nil {
                fmt.Printf("钉钉告警发送失败: %v", err)
        }
}

// 保存告警状态
func saveAlertStatus(status bool) error {
        fmt.Println("打开文件 monstache_status.txt")
        file, err := os.OpenFile("monstache_status.txt", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
        if err != nil {
                return fmt.Errorf("创建monstache_status.txt文件失败: %w", err)
        }
        defer file.Close()
        if _, err := file.WriteString(fmt.Sprintf("%v", status)); err != nil {
                return fmt.Errorf("保存告警状态到monstache_status.txt失败: %w", err)
        }
        fmt.Println("告警状态已保存到monstache_status.txt")
        return nil
}

// 读取告警状态
func loadAlertStatus() (bool, error) {
        file, err := os.Open("monstache_status.txt")
        if err != nil {
                if os.IsNotExist(err) {
                        return false, nil
                }
                return false, fmt.Errorf("打开monstache_status.txt文件失败: %w", err)
        }
        defer file.Close()

        var status bool
        _, err = fmt.Fscanf(file, "%t", &status)
        if err != nil {
                return false, fmt.Errorf("读取告警状态失败: %w", err)
        }
        fmt.Printf("读取告警状态成功: %v\n", status)
        return status, nil
}

func SendDingdingAlert(message string, mobiles []string) error {
        // 生成加签参数
        timestamp := time.Now().UnixNano() / 1e6
        sign := genSign(Secret, timestamp)

        // 构造请求URL
        u, _ := url.Parse(DingdingWebhook)
        q := u.Query()
        q.Add("timestamp", strconv.FormatInt(timestamp, 10))
        q.Add("sign", sign)
        u.RawQuery = q.Encode()

        // 构建请求体
        payload := map[string]interface{}{
                "msgtype": "markdown",
                "markdown": map[string]string{
                        "title": "Monstache 服务告警",
                        "text":  "**告警详情**\n" + message + "\n\n**触发时间**: " + time.Now().Format("2006-01-02 15:04:05") + buildAtTags(mobiles),
                },
                "at": map[string]interface{}{
                        "atMobiles": mobiles, // 指定要 @ 的手机号列表
                        "isAtAll":   false,   // 不 @ 所有人
                },
        }

        jsonData, err := json.Marshal(payload)
        if err != nil {
                return fmt.Errorf("failed to marshal payload: %w", err)
        }

        req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(jsonData))
        if err != nil {
                return fmt.Errorf("failed to create request: %w", err)
        }
        req.Header.Set("Content-Type", "application/json")

        client := &http.Client{Timeout: 5 * time.Second}
        resp, err := client.Do(req)
        if err != nil {
                return fmt.Errorf("request failed: %w", err)
        }
        defer resp.Body.Close()

        if resp.StatusCode != http.StatusOK {
                body, _ := io.ReadAll(resp.Body)
                return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(body))
        }

        return nil
}

// 生成 @ 标签字符串(在消息内容中显示)
func buildAtTags(mobiles []string) string {
        var tags string
        for _, m := range mobiles {
                tags += fmt.Sprintf(" @%s", m)
        }
        return tags
}

func genSign(secret string, timestamp int64) string {
        stringToSign := fmt.Sprintf("%d\n%s", timestamp, secret)
        h := hmac.New(sha256.New, []byte(secret))
        h.Write([]byte(stringToSign))
        return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

func main() {
        // 打开日志文件
        logFile, err := setupLogging()
        if err != nil {
                log.Fatal(err)
        }
        defer logFile.Close()

        // 初始化 MongoDB
        if err := initMongoDB(); err != nil {
                log.Fatalf("MongoDB 初始化失败: %v", err)
        }
        defer mongoClient.Disconnect(context.Background())

        // 初始化 Kubernetes Client
        if err := initK8SClient(); err != nil {
                log.Fatalf("Kubernetes 客户端初始化失败: %v", err)
        }

        // 启动 Prometheus metrics
        go startMetricsServer()

        monitor, err := NewMonitor()
        if err != nil {
                fmt.Printf("初始化 Monitor 失败: %v", err)
        }

        ticker := time.NewTicker(CheckInterval)
        defer ticker.Stop()

        // 首次立即更新
        if err := monitor.UpdatePodIP(); err != nil {
                fmt.Printf("首次 IP 更新失败: %v", err)
        }

        for range ticker.C {
                // 每10分钟更新一次 IP
                if time.Since(monitor.lastIPUpdate) > time.Minute*10 {
                        if err := monitor.UpdatePodIP(); err != nil {
                                fmt.Printf("定时 IP 更新失败: %v", err)
                        }
                }

                if err := monitor.CheckHealth(); err != nil {
                        fmt.Printf("健康检查失败: %v", err)
                }
        }
}
go run monstache.go

运行后会在本地启动6666端口,脚本输出到当前目录的monstache.log中。

file

查看 monstache_health_status metrics的值。

file

prometheus上如果想看的话,需要修改配置增加6666端口的job。

首先配置svc和ep指向6666端口。

apiVersion: v1
kind: Service
metadata:
  name: monstache-metrics
  labels:
    app: monstache
spec:
  ports:
    - port: 6666
      name: monstache-metrics
      targetPort: 6666
---
apiVersion: v1
kind: Endpoints
metadata:
  labels:
    app: monstache
  name: monstache-metrics
subsets:
- addresses:
  - ip: 10.0.6.198
    nodeName: k8s-node008
  ports:
  - name: monstache-metrics
    port: 6666
    protocol: TCP

然后修改prometheus.yml,添加job,重启prometheus。

- job_name: 'monstache-monitor'
    scrape_interval: 30s
    metrics_path: /metrics
    static_configs:
      - targets: ['10.0.0.212:6666']  # monstache的Service和端口
        labels:
          group: 'monstache'

file

file

我这里使用servicemonitor不生效,应该是prometheus版本问题。

monstache同步逻辑

开启resume=true后,它允许在同步过程中断后恢复同步的状态,而不会重复已经同步的数据。

direct-read-namespaces中的集合数据会被一次性读取并同步到 Elasticsearch。只是首次同步时会全量同步。后续重启服务还是增量同步。

change-stream-namespaces中的集合会被实时监听,所有变更操作(如插入、更新、删除)都会被捕获并同步。

namespace-regex会根据正则表达式匹配符合条件的所有集合,并根据是否启用了 Change Stream 或 Direct Read 来同步它们的数据。resume 会确保即使多个集合匹配正则表达式,数据也不会被重复同步。

resume-strategy = 1,表示resume会使用token而不是时间戳进行增量同步。它默认会在MongoDB的monstache.tokens中保存上次同步的token。下次同步会根据这个token进行同步。

在monstache库的token集合中可以看到同步的token值:

file

第一次同步完成后,需要注释掉 direct-read-namespaces,否则重启monstache还会是全量同步。

参考:
https://github.com/rwynn/monstache/issues/355
https://github.com/rwynn/monstache/issues/628

多index同步

同一个MongoDB的库同步到一个es的多个index。可以配置多个mapping,每个块指定不同的 index。

[[mapping]]
    namespace = "deviceAccess.device"
    index = "device"  # 同步到索引 device1

[[mapping]]
    namespace = "deviceAccess.device"
    index = "device1"  # 同时同步到索引 device2

[[mapping]]
    namespace = "deviceAccess.dev_channel"
    index = "dev_channel"  # 原有配置保持不变

经测试该配置不生效,新增数据只会同步在device1中,而device中没有。

也可以使用monstache的script脚本功能实现更高级的需求。
https://rwynn.github.io/monstache-site/config/#script_2
https://rwynn.github.io/monstache-site/advanced/#middleware

script支持go和JavaScript。

go参考:
https://rwynn.github.io/monstache-site/advanced/#golang
https://pkg.go.dev/github.com/rwynn/monstache/monstachemap
https://github.com/rwynn/monstache/issues/653
https://github.com/rwynn/monstache/issues/319
JavaScript参考:
https://rwynn.github.io/monstache-site/advanced/#javascript

高可用

https://rwynn.github.io/monstache-site/advanced/#high-availability

启动多个monstache进程,并使用 相同的cluster-name配置,高可用性的工作原理是确保在任何给定时间 MongoDB 的 monstache.cluster 集合中有一个活动进程,只有这个进程会进行同步,此集合中不存在的进程将暂停。monstache.cluster 集合中的Documents会分配一个 TTL。当此集合中的Documents超时时,MongoDB 将从集合中删除它,并且 monstache 集群中的另一个进程将有机会写入该集合并成为新的活动进程。

配置cluster-name时,resume功能会自动打开,resume-name将成为集群的名称。这是为了确保每个进程都能够从上一个进程停止的位置开始同步。

file

遇到的问题

脚本使用的是monstache的svc地址,线上环境是采用高可用模式的,有2个pod,但只要一个pod是工作的,另一个pod是备用的。当通过svc访问到备用pod时,stats接口的地址可能返回为0,导致脚本判断错误。

file

file

需要修改为正常工作pod的地址。判断方法就是进入pod的/var/log/es目录,有trace日志的就是工作的pod,不工作的pod日志目录中是没有trace日志的。

file

解决方法:需要修改脚本。先到MongoDB中获取monstache.cluster中hosts字段,这个字段就是pod name,然后到k8s中获取这个pod的ip,再根据这个ip来调用/stats。上面的代码已经更新。

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x