背景
我的环境有个kafka线上告警,当前kafka消息堆积到20万时,会发送群通知和打电话。告警规则如下:

我不想让他打电话。目前我这里出现堆积的都是以_docking_analog为结尾的consumerGroup,而恢复的方法都是重启对应消费组的pod,那为什么不在消息还没到20万时,就把pod重启了,这样就不会consumerGroup,而打电话了,告警会自动恢复。
项目结构
我用go实现的。
- 首先需要对接阿里云云监控的SDK,获取对应告警规则的返回结果。
- 然后根据告警触发时获取监控数据。如果consumerGroup包括_docking_analog的前缀。且堆积量大于15万,获取_docking_analog的前缀。
- 再根据前缀到k8s中获取对应的pod并重启。
- 最后再发送钉钉消息到群里。
- 然后配置文件单独拎出来,包括ak,sk,告警规则id,region,钉钉的webhookUrl,secret,k8s的namespace。
这样项目结构如下:

实现
首先在阿里云openapi页面调用DescribeMetricRuleList接口,填写RuleIds告警规则id,发起调用。

下面是一部分返回结果:
{
"RequestId": "CB880416-5453-5290-A9AF-BC153F350E09",
"Total": 1,
"Alarms": {
"Alarm": [
{
"GroupName": "",
"NoEffectiveInterval": "",
"MailSubject": "${serviceType}-${metricName}-${levelDescription}通知(${dimensions})",
"GmtUpdate": 1749087864000,
"SourceType": "METRIC",
"RuleId": "clone_uuid_bfb66e7cba10df82812348d5",
"Prometheus": {},
"MetricName": "message_accumulation",
"CompositeExpression": {},
"RuleName": "复制规则_消费堆积报警",
"SilenceTime": 21600,
"ContactGroups": "kafka",
"GroupBy": "",
"Period": 60,
"Dimensions": "",
"RuleType": "STATIC",
"EffectiveInterval": "09:00-17:59",
"NoDataPolicy": "KEEP_LAST_STATE",
"Namespace": "acs_kafka",
"AlertState": "OK",
"GroupId": "",
"GmtCreate": 1733896962000,
"EnableState": true,
"ProductCategory": "kafka",
"Escalations": {
"Critical": {
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"Times": 3,
"Statistics": "Value",
"Threshold": "200000"
},
"Info": {
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"Times": 10,
"Statistics": "Value",
"Threshold": "15000"
},
"Warn": {
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"Times": 3,
"Statistics": "Value",
"Threshold": "50000"
}
},
"Webhook": "",
"Resources": "[{\"instanceId\":\"alikafka_pre-cn-83l39005\",\"requestRegionId\":\"cn-hangzhou\",\"consumerGroup\":\"xyjxy_docking_analog\"},{\"instanceId\":\"alikafka_pre-cn-83l39005\",\"requestRegionId\":\"cn-hangzhou\",\"consumerGroup\":\"cust_docking_analog\"},]",
"Interval": 60
}
]
},
"Code": "200",
"Success": true
}
这是阿里云云监控sdk文档地址:https://api.aliyun.com/api-tools/sdk/Cms?spm=api-workbench.api_explorer.0.0.e90655ab0U4DVi&version=2019-01-01&language=go-tea&tab=primer-doc
https://api.aliyun.com/document/Cms/2019-01-01/DescribeMetricList
定义的结构体中的字段需要和sdk中的字段保持一致,否则会查询不到对应的告警规则。比如AlertState,RuleId。
aliyun.go
调用DescribeMetricRuleList接口获取告警的状态,如果告警状态为ALARM时,进一步调用DescribeMetricList接口来获取详细的监控数据。再重启pod,发送通知。
package aliyun
import (
"encoding/json"
"fmt"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
openapiutil "github.com/alibabacloud-go/openapi-util/service"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
credential "github.com/aliyun/credentials-go/credentials"
"kafka-monitor/dingtalk"
"kafka-monitor/k8s"
)
const (
// 定义阿里云API调用所需的各种常量,包括API版本、协议、认证类型等,以及告警状态常量。
apiVersion = "2019-01-01"
apiProtocol = "HTTPS"
apiMethod = "POST"
apiAuthType = "AK"
apiStyle = "RPC"
apiPathname = "/"
apiReqBodyType = "json"
apiBodyType = "json"
namespaceKafka = "acs_kafka"
metricName = "message_accumulation"
StatusOK = "OK"
StatusAlarm = "ALARM"
)
// MetricData 监控数据结构,包含实例ID、消费者组、堆积量值和时间戳。
type MetricData struct {
InstanceID string `json:"instanceId"`
ConsumerGroup string `json:"consumerGroup"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
}
// AlarmStatus 告警状态,包含规则ID、状态和监控数据列表。
type AlarmStatus struct {
RuleID string `json:"ruleId"`
Status string `json:"status"` // OK or ALARM
MetricData []MetricData `json:"metricData"`
}
// APIResponse 基础API响应,包含状态码、消息和成功标志。
type APIResponse struct {
Code string `json:"Code"`
Message string `json:"Message"`
Success bool `json:"Success"`
}
// AlarmRuleResponse 告警规则响应,用于解析阿里云API返回的告警规则信息。
type AlarmRuleResponse struct {
APIResponse
Alarms struct {
Alarm []struct {
// API返回的字段名是 "RuleId"
RuleID string `json:"RuleId"`
// API返回的字段名是 "AlertState"
State string `json:"AlertState"`
// 增加 Resources 字段解析,方便调试或后续过滤
Resources string `json:"Resources"`
} `json:"Alarm"`
} `json:"Alarms"`
}
// ClientWrapper 阿里云客户端包装器,包含阿里云客户端实例和区域ID。
type ClientWrapper struct {
client *openapi.Client
regionID string
}
// NewClientWrapper 创建新的客户端包装器,用于与阿里云CMS API进行交互。
func NewClientWrapper(accessKeyID, accessKeySecret, regionID string) (*ClientWrapper, error) {
credConf := &credential.Config{
Type: tea.String("access_key"),
AccessKeyId: tea.String(accessKeyID),
AccessKeySecret: tea.String(accessKeySecret),
}
cred, err := credential.NewCredential(credConf)
if err != nil {
return nil, fmt.Errorf("创建凭据失败: %w", err)
}
config := &openapi.Config{
Credential: cred,
Endpoint: tea.String(fmt.Sprintf("metrics.%s.aliyuncs.com", regionID)),
}
client, err := openapi.NewClient(config)
if err != nil {
return nil, fmt.Errorf("创建CMS客户端失败: %w", err)
}
return &ClientWrapper{
client: client,
regionID: regionID,
}, nil
}
// baseParams 创建基础API参数,用于阿里云API调用。
func baseParams(action string) *openapi.Params {
return &openapi.Params{
Action: tea.String(action),
Version: tea.String(apiVersion),
Protocol: tea.String(apiProtocol),
Method: tea.String(apiMethod),
AuthType: tea.String(apiAuthType),
Style: tea.String(apiStyle),
Pathname: tea.String(apiPathname),
ReqBodyType: tea.String(apiReqBodyType),
BodyType: tea.String(apiBodyType),
}
}
// GetAlarmStatus 获取指定告警规则的状态信息。创建客户端包装器并调用具体实现。
func GetAlarmStatus(accessKeyID, accessKeySecret, regionID, alarmRuleID string) (*AlarmStatus, error) {
clientWrapper, err := NewClientWrapper(accessKeyID, accessKeySecret, regionID)
if err != nil {
return nil, err
}
return clientWrapper.getAlarmStatus(alarmRuleID)
}
// getAlarmStatus 获取告警状态的具体实现
func (cw *ClientWrapper) getAlarmStatus(alarmRuleID string) (*AlarmStatus, error) {
// 获取告警规则列表
ruleResponse, err := cw.DescribeMetricRuleList(alarmRuleID)
if err != nil {
return nil, err
}
// 查找指定的告警规则
var alarmStatus *AlarmStatus
for _, rule := range ruleResponse.Alarms.Alarm {
// 调试输出
// fmt.Printf("找到规则: ID=%s, State=%s\n", rule.RuleID, rule.State)
if rule.RuleID == alarmRuleID {
alarmStatus = &AlarmStatus{
RuleID: alarmRuleID,
Status: rule.State,
}
break
}
}
if alarmStatus == nil {
return nil, fmt.Errorf("未找到告警规则: %s", alarmRuleID)
}
// 如果告警状态为ALARM,获取详细的监控数据
if alarmStatus.Status == StatusAlarm {
metricData, err := cw.getMetricData()
if err != nil {
return nil, fmt.Errorf("获取监控数据失败: %w", err)
}
alarmStatus.MetricData = metricData
}
return alarmStatus, nil
}
// DescribeMetricRuleList 获取告警规则列表
func (cw *ClientWrapper) DescribeMetricRuleList(alarmRuleID string) (*AlarmRuleResponse, error) {
params := baseParams("DescribeMetricRuleList")
queries := map[string]interface{}{
"RuleIds": alarmRuleID,
}
request := &openapi.OpenApiRequest{
Query: openapiutil.Query(queries),
}
resp, err := cw.client.CallApi(params, request, &util.RuntimeOptions{})
if err != nil {
return nil, fmt.Errorf("调用DescribeMetricRuleList失败: %w", err)
}
var response AlarmRuleResponse
if err := cw.parseResponse(resp, &response); err != nil {
return nil, err
}
if !response.Success {
return nil, fmt.Errorf("API调用失败: %s - %s", response.Code, response.Message)
}
return &response, nil
}
// getMetricData 通过调用DescribeMetricList接口获取Kafka消息堆积监控数据。
func (cw *ClientWrapper) getMetricData() ([]MetricData, error) {
params := baseParams("DescribeMetricList")
queries := map[string]interface{}{
"Namespace": namespaceKafka,
"MetricName": metricName,
"Period": "60", // 指定周期,通常 Kafka 堆积量是 60s 一个点
"Length": "100", // 获取最近的数据点数量,防止拉取过多数据
}
request := &openapi.OpenApiRequest{
Query: openapiutil.Query(queries),
}
resp, err := cw.client.CallApi(params, request, &util.RuntimeOptions{})
if err != nil {
return nil, fmt.Errorf("调用DescribeMetricList失败: %w", err)
}
return cw.parseMetricData(resp)
}
// parseMetricData 解析阿里云API返回的监控数据,提取最新的数据点并转换为内部结构。
func (cw *ClientWrapper) parseMetricData(resp map[string]interface{}) ([]MetricData, error) {
// 1. 获取 body
body, ok := resp["body"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("API响应格式错误: body字段不是map")
}
// 2. 检查 Success 字段
if success, ok := body["Success"].(bool); ok && !success {
code, _ := body["Code"].(string)
msg, _ := body["Message"].(string)
return nil, fmt.Errorf("API返回错误: %s - %s", code, msg)
}
// 3. 提取 Datapoints
// 注意:阿里云 API 返回的 Datapoints 是一个 JSON 字符串,需要再次 Unmarshal
datapointsStr, ok := body["Datapoints"].(string)
if !ok || datapointsStr == "" {
// 如果没有数据,Datapoints 可能为空或不存在,这在正常情况下是可能的
return []MetricData{}, nil
}
// 定义 Datapoint 内部结构,对应阿里云返回的实际 JSON 数组项
type CloudMonitorDatapoint struct {
InstanceID string `json:"instanceId"`
ConsumerGroup string `json:"consumerGroup"`
Maximum float64 `json:"Maximum"` // 堆积量通常关注最大值或当前值
Average float64 `json:"Average"`
Timestamp int64 `json:"timestamp"`
}
var points []CloudMonitorDatapoint
if err := json.Unmarshal([]byte(datapointsStr), &points); err != nil {
return nil, fmt.Errorf("解析 Datapoints JSON失败: %w", err)
}
// 4. 数据清洗:保留每个 Group 最新的一个点
latestMap := make(map[string]CloudMonitorDatapoint)
for _, p := range points {
// 唯一键:InstanceID + ConsumerGroup
key := fmt.Sprintf("%s|%s", p.InstanceID, p.ConsumerGroup)
if exist, ok := latestMap[key]; ok {
// 如果当前点的时间更新,则替换
if p.Timestamp > exist.Timestamp {
latestMap[key] = p
}
} else {
latestMap[key] = p
}
}
// 5. 转换为 MetricData 格式返回
var result []MetricData
for _, p := range latestMap {
// 优先使用 Maximum,如果为0尝试 Average (通常对于 Gauge 类型指标,Max=Min=Avg)
val := p.Maximum
if val == 0 {
val = p.Average
}
result = append(result, MetricData{
InstanceID: p.InstanceID,
ConsumerGroup: p.ConsumerGroup,
Value: val,
Timestamp: p.Timestamp,
})
}
return result, nil
}
// parseResponse 解析API响应
func (cw *ClientWrapper) parseResponse(resp map[string]interface{}, target interface{}) error {
body, exists := resp["body"]
if !exists {
return fmt.Errorf("响应中缺少body字段")
}
bodyBytes, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("序列化响应体失败: %w", err)
}
if err := json.Unmarshal(bodyBytes, target); err != nil {
return fmt.Errorf("解析响应失败: %w", err)
}
return nil
}
// ExtractPrefix 从consumerGroup中提取前缀(第一个下划线前的部分)
func ExtractPrefix(consumerGroup string) string {
for i, char := range consumerGroup {
if char == '_' {
return consumerGroup[:i]
}
}
return consumerGroup
}
// RestartMatchingPods 查找并重启匹配的Pod
func RestartMatchingPods(config *Config, prefix string) error {
// 创建K8s客户端
k8sClient, err := k8s.NewK8sClient(config.K8s.Namespace)
if err != nil {
return fmt.Errorf("创建K8s客户端失败: %v", err)
}
// 查找匹配的Pod
pods, err := k8sClient.FindPodsByPrefix(prefix)
if err != nil {
return fmt.Errorf("查找Pod失败: %v", err)
}
if len(pods) == 0 {
fmt.Printf("未找到包含前缀 '%s' 的Pod\n", prefix)
return nil
}
fmt.Printf("找到匹配的Pod: %v\n", pods)
// 重启Pod
err = k8sClient.RestartPods(pods)
if err != nil {
return fmt.Errorf("重启Pod失败: %v", err)
}
fmt.Printf("成功重启 %d 个Pod\n", len(pods))
return nil
}
// SendDingTalkNotification 发送钉钉通知
func SendDingTalkNotification(config *Config, consumerGroup string, value float64) {
// 创建钉钉机器人
robot := dingtalk.NewDingTalkRobot(config.Dingtalk.WebhookURL, config.Dingtalk.Secret)
// 构造消息内容
message := fmt.Sprintf("对应的consumerGroup(%s)消息堆积量超过150000,当前值为%.0f,已重启对应的pod,请继续观察消息堆积量。", consumerGroup, value)
// 发送消息
err := robot.SendMessage(message)
if err != nil {
fmt.Printf("发送钉钉消息失败: %v\n", err)
} else {
fmt.Println("钉钉消息发送成功")
}
}
config.go
读取config.json配置文件中的配置。
package aliyun
import (
"encoding/json"
"fmt"
"os"
)
// Config 配置文件结构
type Config struct {
Aliyun struct {
AccessKeyID string `json:"accessKeyId"`
AccessKeySecret string `json:"accessKeySecret"`
RegionID string `json:"regionId"`
AlarmRuleID string `json:"alarmRuleId"`
} `json:"aliyun"`
K8s struct {
Namespace string `json:"namespace"`
} `json:"k8s"`
Dingtalk struct {
WebhookURL string `json:"webhookUrl"`
Secret string `json:"secret"`
} `json:"dingtalk"`
}
// LoadConfig 从文件加载配置
func LoadConfig(configPath string) (*Config, error) {
data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("读取配置文件失败: %w", err)
}
var config Config
if err := json.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("解析配置文件失败: %w", err)
}
return &config, nil
}
robot.go
定义发送钉钉机器人消息的逻辑。
package dingtalk
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// DingTalkRobot 钉钉机器人
type DingTalkRobot struct {
WebhookURL string
Secret string
}
// Message 钉钉消息结构
type Message struct {
MsgType string `json:"msgtype"`
Text TextBody `json:"text"`
At AtBody `json:"at"`
}
// TextBody 文本消息体
type TextBody struct {
Content string `json:"content"`
}
// AtBody @相关用户
type AtBody struct {
AtMobiles []string `json:"atMobiles"`
IsAtAll bool `json:"isAtAll"`
}
// NewDingTalkRobot 创建新的钉钉机器人实例
func NewDingTalkRobot(webhookURL, secret string) *DingTalkRobot {
return &DingTalkRobot{
WebhookURL: webhookURL,
Secret: secret,
}
}
// GenerateSign 生成签名
func (d *DingTalkRobot) GenerateSign(timestamp int64) string {
stringToSign := fmt.Sprintf("%d\n%s", timestamp, d.Secret)
h := hmac.New(sha256.New, []byte(d.Secret))
h.Write([]byte(stringToSign))
signData := h.Sum(nil)
return base64.StdEncoding.EncodeToString(signData)
}
// SendMessage 发送消息
func (d *DingTalkRobot) SendMessage(content string) error {
// 获取当前时间戳
timestamp := time.Now().UnixNano() / 1e6
// 生成签名
sign := d.GenerateSign(timestamp)
// 构造完整URL
url := fmt.Sprintf("%s×tamp=%d&sign=%s", d.WebhookURL, timestamp, sign)
// 构造消息体
message := Message{
MsgType: "text",
Text: TextBody{
Content: content,
},
At: AtBody{
IsAtAll: false,
},
}
// 序列化消息体
jsonData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("序列化消息失败: %v", err)
}
// 创建HTTP请求
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("创建请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %v", err)
}
// 检查响应状态
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("发送消息失败,状态码: %d, 响应: %s", resp.StatusCode, string(body))
}
fmt.Printf("消息发送成功: %s\n", string(body))
return nil
}
client.go
连接k8s,查找并重启pod。
package k8s
import (
"context"
"fmt"
"strings"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// K8sClient K8s客户端封装
type K8sClient struct {
Clientset *kubernetes.Clientset
Namespace string
}
// NewK8sClient 创建新的K8s客户端
func NewK8sClient(namespace string) (*K8sClient, error) {
// 使用kubeconfig文件创建配置
var kubeConfig = "/root/.kube/config"
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, fmt.Errorf("构建K8s配置失败: %v", err)
}
// 创建clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("创建K8s客户端失败: %v", err)
}
return &K8sClient{
Clientset: clientset,
Namespace: namespace,
}, nil
}
// FindPodsByPrefix 根据前缀查找Pod
func (kc *K8sClient) FindPodsByPrefix(prefix string) ([]string, error) {
// 列出指定命名空间中的所有Pod
pods, err := kc.Clientset.CoreV1().Pods(kc.Namespace).List(context.TODO(), v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("列出Pod失败: %v", err)
}
var matchedPods []string
for _, pod := range pods.Items {
// 检查Pod名称是否包含前缀
if strings.Contains(pod.Name, prefix) {
matchedPods = append(matchedPods, pod.Name)
}
}
return matchedPods, nil
}
// RestartPods 重启指定的Pods
func (kc *K8sClient) RestartPods(podNames []string) error {
for _, podName := range podNames {
// 删除Pod,K8s会自动重新创建
err := kc.Clientset.CoreV1().Pods(kc.Namespace).Delete(context.TODO(), podName, v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("删除Pod %s 失败: %v", podName, err)
}
fmt.Printf("成功删除Pod: %s\n", podName)
}
return nil
}
test_trigger.go
为了告警没有触发时,测试pod是否可以正常重启并发送钉钉消息。
package main
import (
"fmt"
"kafka-monitor/aliyun"
"log"
"os"
)
func main() {
// 1. 检查参数
if len(os.Args) < 2 {
fmt.Printf("使用方法: go run test_trigger.go <配置文件路径>\n")
fmt.Println("例如: go run test_trigger.go config.json")
os.Exit(1)
}
configPath := os.Args[1]
// 2. 加载配置 (获取K8s和钉钉的配置信息)
config, err := aliyun.LoadConfig(configPath)
if err != nil {
log.Fatalf("❌ 加载配置文件失败: %v", err)
}
fmt.Println("✅ 配置文件加载成功")
// ==========================================
// 🔧 模拟数据配置 (请根据你的K8s环境修改这里)
// ==========================================
// 假设你的 K8s 里有一个 Pod 名字叫 "czmhz-service-xxxx"
// 这里构造一个符合逻辑的 ConsumerGroup
// 逻辑是:ConsumerGroup前缀 "czmhz" -> 对应 Pod前缀 "czmhz"
simulatedConsumerGroup := "xunjian_docking_analog"
// 模拟一个超过阈值的堆积量
simulatedValue := 180000.0
fmt.Println("\n🧪 --- 开始模拟告警触发测试 ---")
fmt.Printf(" 模拟消费者组: %s\n", simulatedConsumerGroup)
fmt.Printf(" 模拟堆积量: %.0f\n", simulatedValue)
// 3. 测试前缀提取
prefix := aliyun.ExtractPrefix(simulatedConsumerGroup)
fmt.Printf(" 提取前缀: %s\n", prefix)
if prefix == "" {
log.Fatalf("❌ 前缀提取失败,程序终止")
}
// 4. 测试 K8s Pod 重启逻辑
// 注意:这会真的去重启你 K8s 环境中匹配该前缀的 Pod!
// 如果你在本地运行,请确保你的 kubeconfig 配置正确且能连接到集群
fmt.Println("\n🔄 [测试步骤 1/2] 正在尝试连接K8s并重启Pod...")
err = aliyun.RestartMatchingPods(config, prefix)
if err != nil {
fmt.Printf("❌ K8s重启测试失败: %v\n", err)
fmt.Println(" (请检查: 1.kubeconfig是否正确 2.命名空间是否正确 3.是否有匹配该前缀的Pod)")
} else {
fmt.Println("✅ K8s重启测试执行完毕 (请观察K8s集群中Pod是否发生重建)")
}
// 5. 测试 钉钉通知发送逻辑
fmt.Println("\n📨 [测试步骤 2/2] 正在尝试发送钉钉消息...")
// 这里我们手动调用发送函数
aliyun.SendDingTalkNotification(config, simulatedConsumerGroup, simulatedValue)
fmt.Println("\n🏁 测试脚本执行结束")
}
main.go
package main
import (
"fmt"
"kafka-monitor/aliyun"
"log"
"os"
"strings"
)
func main() {
// 检查配置文件参数,确保提供了配置文件路径。
if len(os.Args) < 2 {
fmt.Printf("使用方法: %s <配置文件路径>\n", os.Args[0])
os.Exit(1)
}
configPath := os.Args[1]
// 从命令行参数获取配置文件路径并加载配置。
config, err := aliyun.LoadConfig(configPath)
if err != nil {
log.Fatalf("❌ 加载配置文件失败: %v", err)
}
fmt.Println("🔗 开始连接阿里云监控服务...")
// 获取告警状态
status, err := aliyun.GetAlarmStatus(
config.Aliyun.AccessKeyID,
config.Aliyun.AccessKeySecret,
config.Aliyun.RegionID,
config.Aliyun.AlarmRuleID,
)
if err != nil {
log.Fatalf("❌ 获取告警状态失败: %v", err)
}
fmt.Println("✅ 阿里云监控连接成功!")
fmt.Printf("\n📊 告警规则状态:\n")
fmt.Printf(" 规则ID: %s\n", status.RuleID)
fmt.Printf(" 状态: %s\n", status.Status)
if status.Status != aliyun.StatusAlarm {
fmt.Println("✅ 当前状态正常 (OK),未拉取详细监控数据。")
return
}
// 只有当状态是 ALARM 时,再去获取监控数据
fmt.Println("⚠️ 云监控触发报警,正在分析详细数据...")
targetSuffix := "docking_analog" // 目标后缀
targetThreshold := 150000.0 // 目标堆积阈值
foundProblem := false
if len(status.MetricData) == 0 {
fmt.Println("⚠️ 状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
}
// 检查告警阈值
for _, metric := range status.MetricData {
// 条件:ConsumerGroup 以 docking_analog 结尾且堆积量超过阈值。
if strings.HasSuffix(metric.ConsumerGroup, targetSuffix) && metric.Value >= targetThreshold {
foundProblem = true
fmt.Println("\n----------------------------------------")
fmt.Printf("🚨 发现异常ConsumerGroup: %s\n", metric.ConsumerGroup)
fmt.Printf(" 当前堆积: %.0f\n", metric.Value)
// 提取前缀
prefix := aliyun.ExtractPrefix(metric.ConsumerGroup)
// 打印信息
fmt.Println("----------------------------------------")
fmt.Printf("🚨 [严重堆积] 发现目标消费者组!\n")
fmt.Printf(" 消费者组: %s\n", metric.ConsumerGroup)
fmt.Printf(" 前缀提取: %s\n", prefix)
fmt.Printf(" 当前堆积: %.0f (阈值: %.0f)\n", metric.Value, targetThreshold)
fmt.Printf(" 实例ID: %s\n", metric.InstanceID)
fmt.Println("----------------------------------------")
fmt.Println(" 🔄 正在尝试重启关联Pod...")
err := aliyun.RestartMatchingPods(config, prefix)
if err != nil {
log.Printf(" ❌ 重启Pod失败: %v\n", err)
// 即使重启失败,可能也需要发钉钉通知告知失败,或者继续执行
}
// C. 发送钉钉通知
fmt.Println(" 📨 正在发送钉钉通知...")
aliyun.SendDingTalkNotification(config, metric.ConsumerGroup, metric.Value)
fmt.Println("----------------------------------------")
}
}
if !foundProblem && len(status.MetricData) > 0 {
fmt.Printf("ℹ️ 虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。\n", targetSuffix, targetThreshold)
}
}
config.json
{
"aliyun": {
"accessKeyId": "your-access-key-id",
"accessKeySecret": "your-access-key-secret",
"regionId": "cn-hangzhou",
"alarmRuleId": "your-alarm-rule-id"
},
"k8s": {
"namespace": "default"
},
"dingtalk": {
"webhookUrl": "xxx",
"secret": "your-sign-secret"
}
}
README.md
# Kafka Monitor
Kafka监控工具,用于监控阿里云Kafka消费者组的消息堆积情况,并在满足条件时自动重启相关的K8s Pod并发送钉钉通知。
# 功能
1. 监控阿里云Kafka消费者组的消息堆积量
2. 当消息堆积量超过阈值(150000)且消费者组以`docking_analog`结尾时,自动提取consumerGroup前缀
3. 在K8s集群中查找并重启包含该前缀的Pod
4. 发送钉钉通知告知操作结果
# 配置
在使用之前,需要修改`config.json`文件中的配置信息:
{
"aliyun": {
"accessKeyId": "your-access-key-id",
"accessKeySecret": "your-access-key-secret",
"regionId": "cn-hangzhou",
"alarmRuleId": "your-alarm-rule-id"
},
"k8s": {
"namespace": "default"
},
"dingtalk": {
"webhookUrl": "xxx",
"secret": "your-sign-secret"
}
}
# 使用方法
1. 确保已经安装Go环境(1.23版本)
2. 确保K8s集群的kubeconfig文件位于`~/.kube/config`
3. 修改`config.json`中的配置信息
4. 编译并运行程序:
go build -o kafka-monitor main.go
./kafka-monitor config.json
# 测试脚本
项目提供了一个测试脚本用于验证K8s Pod重启和钉钉通知功能:
go run test_trigger.go config.json
# 工作原理
1. 程序启动后会读取配置文件
2. 调用阿里云CMS API获取指定告警规则的状态
3. 如果告警状态为"ALARM"且消息堆积量超过150000,则:
- 从consumerGroup中提取第一个下划线前的前缀
- 在K8s集群的指定命名空间中查找包含该前缀的Pod
- 删除这些Pod,让K8s自动重建
- 发送钉钉通知告知操作完成
# 注意事项
- 确保阿里云访问密钥具有CMS API的访问权限
- 确保K8s配置文件具有足够的权限来列出和删除Pod
- 钉钉机器人需要配置加签验证
测试
参见上面的README.md文件,构建二进制包:
go build -o kafka-monitor main.go
./kafka-monitor config.json

使用test_trigger.go测试pod重启和消息发送:



更新
修改main.go,支持指定日志路径和单次执行或者指定时间间隔执行。
package main
import (
"flag"
"fmt"
"kafka-monitor/aliyun"
"log"
"os"
"strings"
"time"
)
func main() {
// 定义命令行参数
configPath := flag.String("config", "", "配置文件路径")
logPath := flag.String("log", "", "日志文件路径 (可选)")
interval := flag.Int("interval", 0, "检查间隔(分钟),0表示单次执行 (可选)")
flag.Parse()
// 如果没有通过flag指定config,则检查位置参数
if *configPath == "" {
if len(os.Args) < 2 {
fmt.Printf("使用方法: %s -config=<配置文件路径> [-log=<日志文件路径>] [-interval=<检查间隔(分钟)>]\n", os.Args[0])
fmt.Printf("或者: %s <配置文件路径>\n", os.Args[0])
os.Exit(1)
}
// 兼容旧的命令行参数方式
configPath = &os.Args[1]
}
// 如果指定了日志文件路径,则设置日志输出到文件
if *logPath != "" {
logFile, err := os.OpenFile(*logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatal("无法打开日志文件:", err)
}
defer logFile.Close()
log.SetOutput(logFile)
}
// 如果指定了检查间隔,则启用定时模式
if *interval > 0 {
ticker := time.NewTicker(time.Duration(*interval) * time.Minute)
defer ticker.Stop()
fmt.Printf("⏰ 定时监控任务已启动,每%d分钟检查一次告警情况...\n", *interval)
for {
fmt.Println("\n🔍 开始检查告警情况...")
checkAndHandleAlarms(*configPath)
<-ticker.C
}
} else {
// 单次执行模式
checkAndHandleAlarms(*configPath)
}
}
func checkAndHandleAlarms(configPath string) {
// 记录开始时间
startTime := time.Now()
log.Printf("[INFO] 开始检查告警情况,时间: %v", startTime)
// 加载配置
config, err := aliyun.LoadConfig(configPath)
if err != nil {
log.Printf("[ERROR] 加载配置文件失败: %v", err)
return
}
fmt.Println("🔗 开始连接阿里云监控服务...")
// 获取告警状态
status, err := aliyun.GetAlarmStatus(
config.Aliyun.AccessKeyID,
config.Aliyun.AccessKeySecret,
config.Aliyun.RegionID,
config.Aliyun.AlarmRuleID,
)
if err != nil {
log.Printf("[ERROR] 获取告警状态失败: %v", err)
return
}
fmt.Println("✅ 阿里云监控连接成功!")
fmt.Printf("\n📊 告警规则状态:\n")
fmt.Printf(" 规则ID: %s\n", status.RuleID)
fmt.Printf(" 状态: %s\n", status.Status)
if status.Status != aliyun.StatusAlarm {
fmt.Println("✅ 当前状态正常 (OK),未触发告警。")
log.Printf("[INFO] 当前状态正常 (OK),未触发告警")
return
}
// 只有当状态是 ALARM 时,才处理告警
fmt.Println("⚠️ 云监控触发报警,正在分析详细数据...")
log.Printf("[WARN] 云监控触发报警,正在分析详细数据...")
targetSuffix := "docking_analog" // 目标后缀
targetThreshold := 150000.0 // 目标堆积阈值
foundProblem := false
if len(status.MetricData) == 0 {
fmt.Println("⚠️ 状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
log.Printf("[WARN] 状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
}
// 检查告警阈值
for _, metric := range status.MetricData {
// 条件:ConsumerGroup 以 docking_analog 结尾且堆积量超过阈值。
if strings.HasSuffix(metric.ConsumerGroup, targetSuffix) && metric.Value >= targetThreshold {
foundProblem = true
fmt.Println("\n----------------------------------------")
fmt.Printf("🚨 发现异常ConsumerGroup: %s\n", metric.ConsumerGroup)
fmt.Printf(" 当前堆积: %.0f\n", metric.Value)
// 提取前缀
prefix := aliyun.ExtractPrefix(metric.ConsumerGroup)
// 打印信息
fmt.Println("----------------------------------------")
fmt.Printf("🚨 [严重堆积] 发现目标消费者组!\n")
fmt.Printf(" 消费者组: %s\n", metric.ConsumerGroup)
fmt.Printf(" 前缀提取: %s\n", prefix)
fmt.Printf(" 当前堆积: %.0f (阈值: %.0f)\n", metric.Value, targetThreshold)
fmt.Printf(" 实例ID: %s\n", metric.InstanceID)
fmt.Println("----------------------------------------")
fmt.Println(" 🔄 正在尝试重启关联Pod...")
err := aliyun.RestartMatchingPods(config, prefix)
if err != nil {
log.Printf(" ❌ 重启Pod失败: %v", err)
// 即使重启失败,可能也需要发钉钉通知告知失败,或者继续执行
}
// 发送钉钉通知
fmt.Println(" 📨 正在发送钉钉通知...")
aliyun.SendDingTalkNotification(config, metric.ConsumerGroup, metric.Value)
fmt.Println("----------------------------------------")
}
}
if !foundProblem && len(status.MetricData) > 0 {
fmt.Printf("ℹ️ 虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。\n", targetSuffix, targetThreshold)
log.Printf("[INFO] 虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。", targetSuffix, targetThreshold)
}
// 记录结束时间
endTime := time.Now()
duration := endTime.Sub(startTime)
fmt.Println("✅ 告警检查完成")
log.Printf("[INFO] 告警检查完成,耗时: %v", duration)
}
配置定时任务,每十分钟检查一次告警状态,日志输出到/var/log/kafka-monitor.log,每3天清空/var/log/kafka-monitor.log日志。
*/10 * * * * /root/kafka-monitor -config=config.json -log=/var/log/kafka-monitor.log
0 0 */3 * * > /var/log/kafka-monitor.log
