背景
我的环境里使用了monstache来进行mongodb和es的同步,但是monstache有时候会假死,这就会导致pod状态正常,但实际上同步已经停止了,日志又看不到,所以需要给monstache加一个监控,当monstache发生假死时发送告警消息,然后自动删除假死的pod,达到自愈的效果。
官方文档:https://rwynn.github.io/monstache-site/config
仓库地址:https://github.com/rwynn/monstache
实现
1.开启http-server
首先修改配置文件config.toml,k8s内即monstache-config的configmap。
enable-http-server=true
2.开启日志
[logs]
info = "/var/log/es/info.log"
warn = "/var/log/es/wran.log"
error = "/var/log/es/error.log"
trace = "/var/log/es/trace.log"
只配置这个不生效,需要配置MONSTACHE_LOG_DIR环境变量。

env:MONSTACHE_LOG_DIR=/var/log/es

重启pod,在/var/log/es目录中就会有info.log,error.log,stats.log,warn.log。
3.配置服务
apiVersion: v1
kind: Service
metadata:
name: monstache
namespace: auto
spec:
ports:
- name: monstache
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: monstache
type: ClusterIP

通过判断Succeeded数值在2分钟内是否发生变化来判断monstache服务是否发生了假死。,如果发生了假死发送消息通知到花茶,并删除假死的pod。
4.脚本实现
monstache.go,修改MongoURI和钉钉DingdingWebhook,Secret,还有CheckHealth和sendAlert函数中的手机号。
package main
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"io"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"log"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// 配置项(建议通过环境变量注入)
var (
MongoURI = "mongodb://root:root@1.2.3.4:3717/"
MongoDBName = "monstache"
MongoCollection = "cluster"
DingdingWebhook = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
Secret = "xxx" // 钉钉加签密钥
CheckInterval = 10 * time.Minute // 检查间隔
StuckThreshold = 3 * time.Minute // 假死判定阈值
PrometheusPort = ":6666" // 指标暴露端口
mongoClient *mongo.Client
k8sClient *kubernetes.Clientset
// 健康状态metrics指标(1=正常,0=异常)
healthStatusGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "monstache_health_status",
Help: "Monstache health status (1=healthy, 0=unhealthy)",
})
)
type Monitor struct {
mu sync.RWMutex
currentPodIP string
lastValue int64
lastChange time.Time
currentAlarm bool
lastIPUpdate time.Time
}
type MonstacheStats struct {
Succeeded int64 `json:"Succeeded"`
}
// 配置日志
func setupLogging() (*os.File, error) {
logFile, err := os.OpenFile("monstache.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("无法打开日志文件: %w", err)
}
log.SetOutput(logFile)
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
return logFile, nil
}
func initMongoDB() error {
client, err := mongo.Connect(context.Background(),
options.Client().ApplyURI(MongoURI).
SetConnectTimeout(10*time.Second).
SetServerSelectionTimeout(30*time.Second))
if err != nil {
return err
}
// 验证连接
if err := client.Ping(context.Background(), nil); err != nil {
return fmt.Errorf("MongoDB 连接测试失败: %w", err)
}
mongoClient = client
fmt.Println("MongoDB连接成功")
return nil
}
func initK8SClient() error {
var kubeConfig = "/root/.kube/config"
// 初始化 Kubernetes 客户端
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return fmt.Errorf("初始化集群配置失败: %w", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("创建客户端失败: %w", err)
}
k8sClient = client
fmt.Println("Kubernetes 连接成功")
return nil
}
// UpdatePodIP Pod IP 管理
func (m *Monitor) UpdatePodIP() error {
// 1. 从 MongoDB 获取 Pod 名称
podName, err := m.getPodNameFromMongo()
if err != nil {
return fmt.Errorf("获取 Pod 名称失败: %w", err)
}
// 2. 从 Kubernetes 获取 Pod IP
podIP, err := m.getPodIP(podName)
if err != nil {
return fmt.Errorf("获取 Pod IP 失败: %w", err)
}
fmt.Printf("当前 Pod Name, IP: %s %s\n", podName, podIP)
// 3. 更新状态
m.mu.Lock()
defer m.mu.Unlock()
if m.currentPodIP != podIP {
log.Printf("检测到 Pod IP 变更: %s -> %s", m.currentPodIP, podIP)
fmt.Printf("检测到 Pod IP 变更: %s -> %s\n", m.currentPodIP, podIP)
m.currentPodIP = podIP
}
m.lastIPUpdate = time.Now()
return nil
}
func (m *Monitor) getPodNameFromMongo() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collection := mongoClient.Database(MongoDBName).Collection(MongoCollection)
var result struct {
Host string `bson:"host"`
}
err := collection.FindOne(ctx, bson.M{}).Decode(&result)
if err != nil {
return "", fmt.Errorf("查询失败: %w", err)
}
if result.Host == "" {
return "", fmt.Errorf("host 字段为空")
}
return result.Host, nil
}
func (m *Monitor) getPodIP(podName string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
pod, err := k8sClient.CoreV1().Pods("firecloud").Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("获取 Pod 信息失败: %w", err)
}
if pod.Status.PodIP == "" {
return "", fmt.Errorf("pod 尚未分配 IP")
}
return pod.Status.PodIP, nil
}
// 根据 Pod IP 获取 Pod 名称
func (m *Monitor) getPodNameByIP(podIP string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 遍历命名空间中的所有 Pods,找到 IP 匹配的 Pod
pods, err := k8sClient.CoreV1().Pods("firecloud").List(ctx, metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("获取 Pod 信息失败: %w", err)
}
for _, pod := range pods.Items {
if pod.Status.PodIP == podIP {
fmt.Printf("pod name: %v\n", pod.Name)
return pod.Name, nil
}
}
return "", fmt.Errorf("未找到对应 IP 的 Pod: %s", podIP)
}
// NewMonitor 修改 Monitor 的构造函数,加载告警状态
func NewMonitor() (*Monitor, error) {
status, err := loadAlertStatus()
if err != nil {
return nil, fmt.Errorf("加载告警状态失败: %w", err)
}
return &Monitor{
currentAlarm: status,
}, nil
}
// CheckHealth 核心健康检查逻辑
func (m *Monitor) CheckHealth() error {
m.mu.RLock()
currentIP := m.currentPodIP
m.mu.RUnlock()
if currentIP == "" {
return fmt.Errorf("当前没有有效的 Pod IP")
}
// 构建动态 URL
monstacheURL := fmt.Sprintf("http://%s:8080/stats", currentIP)
value, err := getSucceededValue(monstacheURL)
if err != nil {
updateHealthStatus(0)
return fmt.Errorf("获取状态失败: %w", err)
}
now := time.Now()
healthStatus := 1 // 默认为健康
// 判断逻辑
if m.lastValue == value {
// 值未变化,检查持续时间
if now.Sub(m.lastChange) > StuckThreshold {
healthStatus = 0 // 超过阈值标记为不健康
}
} else {
// 值发生变化,更新时间戳
m.lastValue = value
m.lastChange = now
}
// 更新指标
updateHealthStatus(healthStatus)
fmt.Println("当前健康状态:", healthStatus)
_, err = NewMonitor()
if err != nil {
log.Printf("加载告警状态失败: %v", err)
}
// 触发告警逻辑
if healthStatus == 0 && !m.currentAlarm {
fmt.Println("发送假死告警")
// 发送假死告警
sendAlert("monstache 服务假死!Succeeded 值长期未变化", []string{"12345678900", "12345678901"})
m.currentAlarm = true
// 保存当前告警状态
if err := saveAlertStatus(m.currentAlarm); err != nil {
log.Printf("保存告警状态失败: %v", err)
}
podName, err := m.getPodNameByIP(currentIP)
if err != nil {
log.Printf("根据 IP 获取 Pod 名称失败: %v", err)
} else {
err := m.deletePod(podName)
if err != nil {
log.Printf("删除 Pod 失败: %v", err)
} else {
fmt.Printf("Pod %s 已删除", podName)
sendAlert("monstache 已重启!请检查是否正常", []string{"12345678900", "12345678901"})
}
}
fmt.Println("告警状态已保存")
} else if healthStatus == 1 && m.currentAlarm {
fmt.Println("发送恢复告警")
// 发送恢复告警
sendAlert("monstache 服务恢复!Succeeded 值已更新", []string{"12345678900", "12345678901"})
m.currentAlarm = false
// 保存当前告警状态
if err := saveAlertStatus(m.currentAlarm); err != nil {
log.Printf("保存告警状态失败: %v", err)
}
fmt.Println("告警状态已保存")
}
return nil
}
func (m *Monitor) deletePod(podName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := k8sClient.CoreV1().Pods("firecloud").Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("删除 Pod 失败: %w", err)
}
fmt.Printf("Pod %s 已删除,Kubernetes 将会自动重启该 Pod\n", podName)
return nil
}
// 统一更新健康状态指标
func updateHealthStatus(status int) {
healthStatusGauge.Set(float64(status))
fmt.Println("更新健康状态指标")
}
func getSucceededValue(url string) (int64, error) {
resp, err := http.Get(url)
if err != nil {
return 0, fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read response: %w", err)
}
var stats MonstacheStats
if err := json.Unmarshal(body, &stats); err != nil {
return 0, fmt.Errorf("failed to parse JSON: %w", err)
}
fmt.Println("Succeeded:", stats.Succeeded)
return stats.Succeeded, nil
}
func startMetricsServer() {
http.Handle("/metrics", promhttp.Handler())
log.Printf("启动指标服务器在 %s", PrometheusPort)
if err := http.ListenAndServe(PrometheusPort, nil); err != nil {
log.Fatalf("指标服务器启动失败: %v", err)
}
}
func sendAlert(message string, mobiles []string) {
fmt.Printf("发送花茶告警消息: %s", message)
newMessage := fmt.Sprintf("[Monstache 状态变更]\n%s\n检测时间: %s",
message, time.Now().Format("2006-01-02 15:04:05"))
if err := SendDingdingAlert(newMessage, []string{"12345678900", "12345678901"}); err != nil {
fmt.Printf("钉钉告警发送失败: %v", err)
}
}
// 保存告警状态
func saveAlertStatus(status bool) error {
fmt.Println("打开文件 monstache_status.txt")
file, err := os.OpenFile("monstache_status.txt", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return fmt.Errorf("创建monstache_status.txt文件失败: %w", err)
}
defer file.Close()
if _, err := file.WriteString(fmt.Sprintf("%v", status)); err != nil {
return fmt.Errorf("保存告警状态到monstache_status.txt失败: %w", err)
}
fmt.Println("告警状态已保存到monstache_status.txt")
return nil
}
// 读取告警状态
func loadAlertStatus() (bool, error) {
file, err := os.Open("monstache_status.txt")
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("打开monstache_status.txt文件失败: %w", err)
}
defer file.Close()
var status bool
_, err = fmt.Fscanf(file, "%t", &status)
if err != nil {
return false, fmt.Errorf("读取告警状态失败: %w", err)
}
fmt.Printf("读取告警状态成功: %v\n", status)
return status, nil
}
func SendDingdingAlert(message string, mobiles []string) error {
// 生成加签参数
timestamp := time.Now().UnixNano() / 1e6
sign := genSign(Secret, timestamp)
// 构造请求URL
u, _ := url.Parse(DingdingWebhook)
q := u.Query()
q.Add("timestamp", strconv.FormatInt(timestamp, 10))
q.Add("sign", sign)
u.RawQuery = q.Encode()
// 构建请求体
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"title": "Monstache 服务告警",
"text": "**告警详情**\n" + message + "\n\n**触发时间**: " + time.Now().Format("2006-01-02 15:04:05") + buildAtTags(mobiles),
},
"at": map[string]interface{}{
"atMobiles": mobiles, // 指定要 @ 的手机号列表
"isAtAll": false, // 不 @ 所有人
},
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(body))
}
return nil
}
// 生成 @ 标签字符串(在消息内容中显示)
func buildAtTags(mobiles []string) string {
var tags string
for _, m := range mobiles {
tags += fmt.Sprintf(" @%s", m)
}
return tags
}
func genSign(secret string, timestamp int64) string {
stringToSign := fmt.Sprintf("%d\n%s", timestamp, secret)
h := hmac.New(sha256.New, []byte(secret))
h.Write([]byte(stringToSign))
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
func main() {
// 打开日志文件
logFile, err := setupLogging()
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
// 初始化 MongoDB
if err := initMongoDB(); err != nil {
log.Fatalf("MongoDB 初始化失败: %v", err)
}
defer mongoClient.Disconnect(context.Background())
// 初始化 Kubernetes Client
if err := initK8SClient(); err != nil {
log.Fatalf("Kubernetes 客户端初始化失败: %v", err)
}
// 启动 Prometheus metrics
go startMetricsServer()
monitor, err := NewMonitor()
if err != nil {
fmt.Printf("初始化 Monitor 失败: %v", err)
}
ticker := time.NewTicker(CheckInterval)
defer ticker.Stop()
// 首次立即更新
if err := monitor.UpdatePodIP(); err != nil {
fmt.Printf("首次 IP 更新失败: %v", err)
}
for range ticker.C {
// 每10分钟更新一次 IP
if time.Since(monitor.lastIPUpdate) > time.Minute*10 {
if err := monitor.UpdatePodIP(); err != nil {
fmt.Printf("定时 IP 更新失败: %v", err)
}
}
if err := monitor.CheckHealth(); err != nil {
fmt.Printf("健康检查失败: %v", err)
}
}
}
go run monstache.go
运行后会在本地启动6666端口,脚本输出到当前目录的monstache.log中。

查看 monstache_health_status metrics的值。

prometheus上如果想看的话,需要修改配置增加6666端口的job。
首先配置svc和ep指向6666端口。
apiVersion: v1
kind: Service
metadata:
name: monstache-metrics
labels:
app: monstache
spec:
ports:
- port: 6666
name: monstache-metrics
targetPort: 6666
---
apiVersion: v1
kind: Endpoints
metadata:
labels:
app: monstache
name: monstache-metrics
subsets:
- addresses:
- ip: 10.0.6.198
nodeName: k8s-node008
ports:
- name: monstache-metrics
port: 6666
protocol: TCP
然后修改prometheus.yml,添加job,重启prometheus。
- job_name: 'monstache-monitor'
scrape_interval: 30s
metrics_path: /metrics
static_configs:
- targets: ['10.0.0.212:6666'] # monstache的Service和端口
labels:
group: 'monstache'


我这里使用servicemonitor不生效,应该是prometheus版本问题。
monstache同步逻辑
开启resume=true后,它允许在同步过程中断后恢复同步的状态,而不会重复已经同步的数据。
direct-read-namespaces中的集合数据会被一次性读取并同步到 Elasticsearch。只是首次同步时会全量同步。后续重启服务还是增量同步。
change-stream-namespaces中的集合会被实时监听,所有变更操作(如插入、更新、删除)都会被捕获并同步。
namespace-regex会根据正则表达式匹配符合条件的所有集合,并根据是否启用了 Change Stream 或 Direct Read 来同步它们的数据。resume 会确保即使多个集合匹配正则表达式,数据也不会被重复同步。
resume-strategy = 1,表示resume会使用token而不是时间戳进行增量同步。它默认会在MongoDB的monstache.tokens中保存上次同步的token。下次同步会根据这个token进行同步。
在monstache库的token集合中可以看到同步的token值:

第一次同步完成后,需要注释掉 direct-read-namespaces,否则重启monstache还会是全量同步。
参考:
https://github.com/rwynn/monstache/issues/355
https://github.com/rwynn/monstache/issues/628
多index同步
同一个MongoDB的库同步到一个es的多个index。可以配置多个mapping,每个块指定不同的 index。
[[mapping]]
namespace = "deviceAccess.device"
index = "device" # 同步到索引 device1
[[mapping]]
namespace = "deviceAccess.device"
index = "device1" # 同时同步到索引 device2
[[mapping]]
namespace = "deviceAccess.dev_channel"
index = "dev_channel" # 原有配置保持不变
经测试该配置不生效,新增数据只会同步在device1中,而device中没有。
也可以使用monstache的script脚本功能实现更高级的需求。
https://rwynn.github.io/monstache-site/config/#script_2
https://rwynn.github.io/monstache-site/advanced/#middleware
script支持go和JavaScript。
go参考:
https://rwynn.github.io/monstache-site/advanced/#golang
https://pkg.go.dev/github.com/rwynn/monstache/monstachemap
https://github.com/rwynn/monstache/issues/653
https://github.com/rwynn/monstache/issues/319
JavaScript参考:
https://rwynn.github.io/monstache-site/advanced/#javascript
高可用
https://rwynn.github.io/monstache-site/advanced/#high-availability
启动多个monstache进程,并使用 相同的cluster-name配置,高可用性的工作原理是确保在任何给定时间 MongoDB 的 monstache.cluster 集合中有一个活动进程,只有这个进程会进行同步,此集合中不存在的进程将暂停。monstache.cluster 集合中的Documents会分配一个 TTL。当此集合中的Documents超时时,MongoDB 将从集合中删除它,并且 monstache 集群中的另一个进程将有机会写入该集合并成为新的活动进程。
配置cluster-name时,resume功能会自动打开,resume-name将成为集群的名称。这是为了确保每个进程都能够从上一个进程停止的位置开始同步。

遇到的问题
脚本使用的是monstache的svc地址,线上环境是采用高可用模式的,有2个pod,但只要一个pod是工作的,另一个pod是备用的。当通过svc访问到备用pod时,stats接口的地址可能返回为0,导致脚本判断错误。


需要修改为正常工作pod的地址。判断方法就是进入pod的/var/log/es目录,有trace日志的就是工作的pod,不工作的pod日志目录中是没有trace日志的。

解决方法:需要修改脚本。先到MongoDB中获取monstache.cluster中hosts字段,这个字段就是pod name,然后到k8s中获取这个pod的ip,再根据这个ip来调用/stats。上面的代码已经更新。