k8s

阿里云kafka消息堆积自动恢复

背景

我的环境有个kafka线上告警,当前kafka消息堆积到20万时,会发送群通知和打电话。告警规则如下:

file

我不想让他打电话。目前我这里出现堆积的都是以_docking_analog为结尾的consumerGroup,而恢复的方法都是重启对应消费组的pod,那为什么不在消息还没到20万时,就把pod重启了,这样就不会consumerGroup,而打电话了,告警会自动恢复。

项目结构

我用go实现的。

  • 首先需要对接阿里云云监控的SDK,获取对应告警规则的返回结果。
  • 然后根据告警触发时获取监控数据。如果consumerGroup包括_docking_analog的前缀。且堆积量大于15万,获取_docking_analog的前缀。
  • 再根据前缀到k8s中获取对应的pod并重启。
  • 最后再发送钉钉消息到群里。
  • 然后配置文件单独拎出来,包括ak,sk,告警规则id,region,钉钉的webhookUrl,secret,k8s的namespace。

这样项目结构如下:

file

实现

首先在阿里云openapi页面调用DescribeMetricRuleList接口,填写RuleIds告警规则id,发起调用。

file
下面是一部分返回结果:

{
  "RequestId": "CB880416-5453-5290-A9AF-BC153F350E09",
  "Total": 1,
  "Alarms": {
    "Alarm": [
      {
        "GroupName": "",
        "NoEffectiveInterval": "",
        "MailSubject": "${serviceType}-${metricName}-${levelDescription}通知(${dimensions})",
        "GmtUpdate": 1749087864000,
        "SourceType": "METRIC",
        "RuleId": "clone_uuid_bfb66e7cba10df82812348d5",
        "Prometheus": {},
        "MetricName": "message_accumulation",
        "CompositeExpression": {},
        "RuleName": "复制规则_消费堆积报警",
        "SilenceTime": 21600,
        "ContactGroups": "kafka",
        "GroupBy": "",
        "Period": 60,
        "Dimensions": "",
        "RuleType": "STATIC",
        "EffectiveInterval": "09:00-17:59",
        "NoDataPolicy": "KEEP_LAST_STATE",
        "Namespace": "acs_kafka",
        "AlertState": "OK",
        "GroupId": "",
        "GmtCreate": 1733896962000,
        "EnableState": true,
        "ProductCategory": "kafka",
        "Escalations": {
          "Critical": {
            "ComparisonOperator": "GreaterThanOrEqualToThreshold",
            "Times": 3,
            "Statistics": "Value",
            "Threshold": "200000"
          },
          "Info": {
            "ComparisonOperator": "GreaterThanOrEqualToThreshold",
            "Times": 10,
            "Statistics": "Value",
            "Threshold": "15000"
          },
          "Warn": {
            "ComparisonOperator": "GreaterThanOrEqualToThreshold",
            "Times": 3,
            "Statistics": "Value",
            "Threshold": "50000"
          }
        },
        "Webhook": "",
        "Resources": "[{\"instanceId\":\"alikafka_pre-cn-83l39005\",\"requestRegionId\":\"cn-hangzhou\",\"consumerGroup\":\"xyjxy_docking_analog\"},{\"instanceId\":\"alikafka_pre-cn-83l39005\",\"requestRegionId\":\"cn-hangzhou\",\"consumerGroup\":\"cust_docking_analog\"},]",
        "Interval": 60
      }
    ]
  },
  "Code": "200",
  "Success": true
}

这是阿里云云监控sdk文档地址:https://api.aliyun.com/api-tools/sdk/Cms?spm=api-workbench.api_explorer.0.0.e90655ab0U4DVi&version=2019-01-01&language=go-tea&tab=primer-doc

https://api.aliyun.com/document/Cms/2019-01-01/DescribeMetricList

定义的结构体中的字段需要和sdk中的字段保持一致,否则会查询不到对应的告警规则。比如AlertState,RuleId。

aliyun.go

调用DescribeMetricRuleList接口获取告警的状态,如果告警状态为ALARM时,进一步调用DescribeMetricList接口来获取详细的监控数据。再重启pod,发送通知。

package aliyun

import (
    "encoding/json"
    "fmt"
    openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
    openapiutil "github.com/alibabacloud-go/openapi-util/service"
    util "github.com/alibabacloud-go/tea-utils/v2/service"
    "github.com/alibabacloud-go/tea/tea"
    credential "github.com/aliyun/credentials-go/credentials"
    "kafka-monitor/dingtalk"
    "kafka-monitor/k8s"
)

const (
    // 定义阿里云API调用所需的各种常量,包括API版本、协议、认证类型等,以及告警状态常量。
    apiVersion     = "2019-01-01"
    apiProtocol    = "HTTPS"
    apiMethod      = "POST"
    apiAuthType    = "AK"
    apiStyle       = "RPC"
    apiPathname    = "/"
    apiReqBodyType = "json"
    apiBodyType    = "json"
    namespaceKafka = "acs_kafka"
    metricName     = "message_accumulation"
    StatusOK    = "OK"
    StatusAlarm = "ALARM"
)

// MetricData 监控数据结构,包含实例ID、消费者组、堆积量值和时间戳。
type MetricData struct {
    InstanceID    string  `json:"instanceId"`
    ConsumerGroup string  `json:"consumerGroup"`
    Value         float64 `json:"value"`
    Timestamp     int64   `json:"timestamp"`
}

// AlarmStatus 告警状态,包含规则ID、状态和监控数据列表。
type AlarmStatus struct {
    RuleID     string       `json:"ruleId"`
    Status     string       `json:"status"` // OK or ALARM
    MetricData []MetricData `json:"metricData"`
}

// APIResponse 基础API响应,包含状态码、消息和成功标志。
type APIResponse struct {
    Code    string `json:"Code"`
    Message string `json:"Message"`
    Success bool   `json:"Success"`
}

// AlarmRuleResponse 告警规则响应,用于解析阿里云API返回的告警规则信息。
type AlarmRuleResponse struct {
    APIResponse
    Alarms struct {
        Alarm []struct {
            // API返回的字段名是 "RuleId"
            RuleID string `json:"RuleId"`
            // API返回的字段名是 "AlertState"
            State string `json:"AlertState"`
            // 增加 Resources 字段解析,方便调试或后续过滤
            Resources string `json:"Resources"`
        } `json:"Alarm"`
    } `json:"Alarms"`
}

// ClientWrapper 阿里云客户端包装器,包含阿里云客户端实例和区域ID。
type ClientWrapper struct {
    client   *openapi.Client
    regionID string
}

// NewClientWrapper 创建新的客户端包装器,用于与阿里云CMS API进行交互。
func NewClientWrapper(accessKeyID, accessKeySecret, regionID string) (*ClientWrapper, error) {
    credConf := &credential.Config{
        Type:            tea.String("access_key"),
        AccessKeyId:     tea.String(accessKeyID),
        AccessKeySecret: tea.String(accessKeySecret),
    }

    cred, err := credential.NewCredential(credConf)
    if err != nil {
        return nil, fmt.Errorf("创建凭据失败: %w", err)
    }

    config := &openapi.Config{
        Credential: cred,
        Endpoint:   tea.String(fmt.Sprintf("metrics.%s.aliyuncs.com", regionID)),
    }

    client, err := openapi.NewClient(config)
    if err != nil {
        return nil, fmt.Errorf("创建CMS客户端失败: %w", err)
    }

    return &ClientWrapper{
        client:   client,
        regionID: regionID,
    }, nil
}

// baseParams 创建基础API参数,用于阿里云API调用。
func baseParams(action string) *openapi.Params {
    return &openapi.Params{
        Action:      tea.String(action),
        Version:     tea.String(apiVersion),
        Protocol:    tea.String(apiProtocol),
        Method:      tea.String(apiMethod),
        AuthType:    tea.String(apiAuthType),
        Style:       tea.String(apiStyle),
        Pathname:    tea.String(apiPathname),
        ReqBodyType: tea.String(apiReqBodyType),
        BodyType:    tea.String(apiBodyType),
    }
}

// GetAlarmStatus 获取指定告警规则的状态信息。创建客户端包装器并调用具体实现。
func GetAlarmStatus(accessKeyID, accessKeySecret, regionID, alarmRuleID string) (*AlarmStatus, error) {
    clientWrapper, err := NewClientWrapper(accessKeyID, accessKeySecret, regionID)
    if err != nil {
        return nil, err
    }

    return clientWrapper.getAlarmStatus(alarmRuleID)
}

// getAlarmStatus 获取告警状态的具体实现
func (cw *ClientWrapper) getAlarmStatus(alarmRuleID string) (*AlarmStatus, error) {
    // 获取告警规则列表
    ruleResponse, err := cw.DescribeMetricRuleList(alarmRuleID)
    if err != nil {
        return nil, err
    }

    // 查找指定的告警规则
    var alarmStatus *AlarmStatus

    for _, rule := range ruleResponse.Alarms.Alarm {
        // 调试输出
        // fmt.Printf("找到规则: ID=%s, State=%s\n", rule.RuleID, rule.State)

        if rule.RuleID == alarmRuleID {
            alarmStatus = &AlarmStatus{
                RuleID: alarmRuleID,
                Status: rule.State,
            }
            break
        }
    }

    if alarmStatus == nil {
        return nil, fmt.Errorf("未找到告警规则: %s", alarmRuleID)
    }

    // 如果告警状态为ALARM,获取详细的监控数据
    if alarmStatus.Status == StatusAlarm {
        metricData, err := cw.getMetricData()
        if err != nil {
            return nil, fmt.Errorf("获取监控数据失败: %w", err)
        }
        alarmStatus.MetricData = metricData
    }

    return alarmStatus, nil
}

// DescribeMetricRuleList 获取告警规则列表
func (cw *ClientWrapper) DescribeMetricRuleList(alarmRuleID string) (*AlarmRuleResponse, error) {
    params := baseParams("DescribeMetricRuleList")

    queries := map[string]interface{}{
        "RuleIds": alarmRuleID,
    }

    request := &openapi.OpenApiRequest{
        Query: openapiutil.Query(queries),
    }

    resp, err := cw.client.CallApi(params, request, &util.RuntimeOptions{})
    if err != nil {
        return nil, fmt.Errorf("调用DescribeMetricRuleList失败: %w", err)
    }

    var response AlarmRuleResponse
    if err := cw.parseResponse(resp, &response); err != nil {
        return nil, err
    }

    if !response.Success {
        return nil, fmt.Errorf("API调用失败: %s - %s", response.Code, response.Message)
    }

    return &response, nil
}

// getMetricData 通过调用DescribeMetricList接口获取Kafka消息堆积监控数据。
func (cw *ClientWrapper) getMetricData() ([]MetricData, error) {
    params := baseParams("DescribeMetricList")

    queries := map[string]interface{}{
        "Namespace":  namespaceKafka,
        "MetricName": metricName,
        "Period":     "60",  // 指定周期,通常 Kafka 堆积量是 60s 一个点
        "Length":     "100", // 获取最近的数据点数量,防止拉取过多数据
    }

    request := &openapi.OpenApiRequest{
        Query: openapiutil.Query(queries),
    }

    resp, err := cw.client.CallApi(params, request, &util.RuntimeOptions{})
    if err != nil {
        return nil, fmt.Errorf("调用DescribeMetricList失败: %w", err)
    }

    return cw.parseMetricData(resp)
}

// parseMetricData 解析阿里云API返回的监控数据,提取最新的数据点并转换为内部结构。
func (cw *ClientWrapper) parseMetricData(resp map[string]interface{}) ([]MetricData, error) {
    // 1. 获取 body
    body, ok := resp["body"].(map[string]interface{})
    if !ok {
        return nil, fmt.Errorf("API响应格式错误: body字段不是map")
    }

    // 2. 检查 Success 字段
    if success, ok := body["Success"].(bool); ok && !success {
        code, _ := body["Code"].(string)
        msg, _ := body["Message"].(string)
        return nil, fmt.Errorf("API返回错误: %s - %s", code, msg)
    }

    // 3. 提取 Datapoints
    // 注意:阿里云 API 返回的 Datapoints 是一个 JSON 字符串,需要再次 Unmarshal
    datapointsStr, ok := body["Datapoints"].(string)
    if !ok || datapointsStr == "" {
        // 如果没有数据,Datapoints 可能为空或不存在,这在正常情况下是可能的
        return []MetricData{}, nil
    }

    // 定义 Datapoint 内部结构,对应阿里云返回的实际 JSON 数组项
    type CloudMonitorDatapoint struct {
        InstanceID    string  `json:"instanceId"`
        ConsumerGroup string  `json:"consumerGroup"`
        Maximum       float64 `json:"Maximum"` // 堆积量通常关注最大值或当前值
        Average       float64 `json:"Average"`
        Timestamp     int64   `json:"timestamp"`
    }

    var points []CloudMonitorDatapoint
    if err := json.Unmarshal([]byte(datapointsStr), &points); err != nil {
        return nil, fmt.Errorf("解析 Datapoints JSON失败: %w", err)
    }

    // 4. 数据清洗:保留每个 Group 最新的一个点
    latestMap := make(map[string]CloudMonitorDatapoint)

    for _, p := range points {
        // 唯一键:InstanceID + ConsumerGroup
        key := fmt.Sprintf("%s|%s", p.InstanceID, p.ConsumerGroup)

        if exist, ok := latestMap[key]; ok {
            // 如果当前点的时间更新,则替换
            if p.Timestamp > exist.Timestamp {
                latestMap[key] = p
            }
        } else {
            latestMap[key] = p
        }
    }

    // 5. 转换为 MetricData 格式返回
    var result []MetricData
    for _, p := range latestMap {
        // 优先使用 Maximum,如果为0尝试 Average (通常对于 Gauge 类型指标,Max=Min=Avg)
        val := p.Maximum
        if val == 0 {
            val = p.Average
        }

        result = append(result, MetricData{
            InstanceID:    p.InstanceID,
            ConsumerGroup: p.ConsumerGroup,
            Value:         val,
            Timestamp:     p.Timestamp,
        })
    }

    return result, nil
}

// parseResponse 解析API响应
func (cw *ClientWrapper) parseResponse(resp map[string]interface{}, target interface{}) error {
    body, exists := resp["body"]
    if !exists {
        return fmt.Errorf("响应中缺少body字段")
    }

    bodyBytes, err := json.Marshal(body)
    if err != nil {
        return fmt.Errorf("序列化响应体失败: %w", err)
    }

    if err := json.Unmarshal(bodyBytes, target); err != nil {
        return fmt.Errorf("解析响应失败: %w", err)
    }

    return nil
}

// ExtractPrefix 从consumerGroup中提取前缀(第一个下划线前的部分)
func ExtractPrefix(consumerGroup string) string {
    for i, char := range consumerGroup {
        if char == '_' {
            return consumerGroup[:i]
        }
    }
    return consumerGroup
}

// RestartMatchingPods 查找并重启匹配的Pod
func RestartMatchingPods(config *Config, prefix string) error {
    // 创建K8s客户端
    k8sClient, err := k8s.NewK8sClient(config.K8s.Namespace)
    if err != nil {
        return fmt.Errorf("创建K8s客户端失败: %v", err)
    }

    // 查找匹配的Pod
    pods, err := k8sClient.FindPodsByPrefix(prefix)
    if err != nil {
        return fmt.Errorf("查找Pod失败: %v", err)
    }

    if len(pods) == 0 {
        fmt.Printf("未找到包含前缀 '%s' 的Pod\n", prefix)
        return nil
    }

    fmt.Printf("找到匹配的Pod: %v\n", pods)

    // 重启Pod
    err = k8sClient.RestartPods(pods)
    if err != nil {
        return fmt.Errorf("重启Pod失败: %v", err)
    }

    fmt.Printf("成功重启 %d 个Pod\n", len(pods))
    return nil
}

// SendDingTalkNotification 发送钉钉通知
func SendDingTalkNotification(config *Config, consumerGroup string, value float64) {
    // 创建钉钉机器人
    robot := dingtalk.NewDingTalkRobot(config.Dingtalk.WebhookURL, config.Dingtalk.Secret)

    // 构造消息内容
    message := fmt.Sprintf("对应的consumerGroup(%s)消息堆积量超过150000,当前值为%.0f,已重启对应的pod,请继续观察消息堆积量。", consumerGroup, value)

    // 发送消息
    err := robot.SendMessage(message)
    if err != nil {
        fmt.Printf("发送钉钉消息失败: %v\n", err)
    } else {
        fmt.Println("钉钉消息发送成功")
    }
}

config.go

读取config.json配置文件中的配置。

package aliyun

import (
    "encoding/json"
    "fmt"
    "os"
)

// Config 配置文件结构
type Config struct {
    Aliyun struct {
        AccessKeyID     string `json:"accessKeyId"`
        AccessKeySecret string `json:"accessKeySecret"`
        RegionID        string `json:"regionId"`
        AlarmRuleID     string `json:"alarmRuleId"`
    } `json:"aliyun"`
    K8s struct {
        Namespace string `json:"namespace"`
    } `json:"k8s"`
    Dingtalk struct {
        WebhookURL string `json:"webhookUrl"`
        Secret     string `json:"secret"`
    } `json:"dingtalk"`
}

// LoadConfig 从文件加载配置
func LoadConfig(configPath string) (*Config, error) {
    data, err := os.ReadFile(configPath)
    if err != nil {
        return nil, fmt.Errorf("读取配置文件失败: %w", err)
    }

    var config Config
    if err := json.Unmarshal(data, &config); err != nil {
        return nil, fmt.Errorf("解析配置文件失败: %w", err)
    }

    return &config, nil
}

robot.go

定义发送钉钉机器人消息的逻辑。

package dingtalk

import (
    "bytes"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

// DingTalkRobot 钉钉机器人
type DingTalkRobot struct {
    WebhookURL string
    Secret     string
}

// Message 钉钉消息结构
type Message struct {
    MsgType string   `json:"msgtype"`
    Text    TextBody `json:"text"`
    At      AtBody   `json:"at"`
}

// TextBody 文本消息体
type TextBody struct {
    Content string `json:"content"`
}

// AtBody @相关用户
type AtBody struct {
    AtMobiles []string `json:"atMobiles"`
    IsAtAll   bool     `json:"isAtAll"`
}

// NewDingTalkRobot 创建新的钉钉机器人实例
func NewDingTalkRobot(webhookURL, secret string) *DingTalkRobot {
    return &DingTalkRobot{
        WebhookURL: webhookURL,
        Secret:     secret,
    }
}

// GenerateSign 生成签名
func (d *DingTalkRobot) GenerateSign(timestamp int64) string {
    stringToSign := fmt.Sprintf("%d\n%s", timestamp, d.Secret)
    h := hmac.New(sha256.New, []byte(d.Secret))
    h.Write([]byte(stringToSign))
    signData := h.Sum(nil)
    return base64.StdEncoding.EncodeToString(signData)
}

// SendMessage 发送消息
func (d *DingTalkRobot) SendMessage(content string) error {
    // 获取当前时间戳
    timestamp := time.Now().UnixNano() / 1e6

    // 生成签名
    sign := d.GenerateSign(timestamp)

    // 构造完整URL
    url := fmt.Sprintf("%s&timestamp=%d&sign=%s", d.WebhookURL, timestamp, sign)

    // 构造消息体
    message := Message{
        MsgType: "text",
        Text: TextBody{
            Content: content,
        },
        At: AtBody{
            IsAtAll: false,
        },
    }

    // 序列化消息体
    jsonData, err := json.Marshal(message)
    if err != nil {
        return fmt.Errorf("序列化消息失败: %v", err)
    }

    // 创建HTTP请求
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
    if err != nil {
        return fmt.Errorf("创建请求失败: %v", err)
    }
    req.Header.Set("Content-Type", "application/json")

    // 发送请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("发送请求失败: %v", err)
    }
    defer resp.Body.Close()

    // 读取响应
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return fmt.Errorf("读取响应失败: %v", err)
    }

    // 检查响应状态
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("发送消息失败,状态码: %d, 响应: %s", resp.StatusCode, string(body))
    }

    fmt.Printf("消息发送成功: %s\n", string(body))
    return nil
}

client.go

连接k8s,查找并重启pod。

package k8s

import (
    "context"
    "fmt"
    "strings"

    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

// K8sClient K8s客户端封装
type K8sClient struct {
    Clientset *kubernetes.Clientset
    Namespace string
}

// NewK8sClient 创建新的K8s客户端
func NewK8sClient(namespace string) (*K8sClient, error) {
    // 使用kubeconfig文件创建配置
    var kubeConfig = "/root/.kube/config"
    config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
    if err != nil {
        return nil, fmt.Errorf("构建K8s配置失败: %v", err)
    }

    // 创建clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, fmt.Errorf("创建K8s客户端失败: %v", err)
    }

    return &K8sClient{
        Clientset: clientset,
        Namespace: namespace,
    }, nil
}

// FindPodsByPrefix 根据前缀查找Pod
func (kc *K8sClient) FindPodsByPrefix(prefix string) ([]string, error) {
    // 列出指定命名空间中的所有Pod
    pods, err := kc.Clientset.CoreV1().Pods(kc.Namespace).List(context.TODO(), v1.ListOptions{})
    if err != nil {
        return nil, fmt.Errorf("列出Pod失败: %v", err)
    }

    var matchedPods []string
    for _, pod := range pods.Items {
        // 检查Pod名称是否包含前缀
        if strings.Contains(pod.Name, prefix) {
            matchedPods = append(matchedPods, pod.Name)
        }
    }

    return matchedPods, nil
}

// RestartPods 重启指定的Pods
func (kc *K8sClient) RestartPods(podNames []string) error {
    for _, podName := range podNames {
        // 删除Pod,K8s会自动重新创建
        err := kc.Clientset.CoreV1().Pods(kc.Namespace).Delete(context.TODO(), podName, v1.DeleteOptions{})
        if err != nil {
            return fmt.Errorf("删除Pod %s 失败: %v", podName, err)
        }
        fmt.Printf("成功删除Pod: %s\n", podName)
    }
    return nil
}

test_trigger.go

为了告警没有触发时,测试pod是否可以正常重启并发送钉钉消息。

package main

import (
    "fmt"
    "kafka-monitor/aliyun"
    "log"
    "os"
)

func main() {
    // 1. 检查参数
    if len(os.Args) < 2 {
        fmt.Printf("使用方法: go run test_trigger.go <配置文件路径>\n")
        fmt.Println("例如: go run test_trigger.go config.json")
        os.Exit(1)
    }

    configPath := os.Args[1]

    // 2. 加载配置 (获取K8s和钉钉的配置信息)
    config, err := aliyun.LoadConfig(configPath)
    if err != nil {
        log.Fatalf("❌ 加载配置文件失败: %v", err)
    }
    fmt.Println("✅ 配置文件加载成功")

    // ==========================================
    // 🔧 模拟数据配置 (请根据你的K8s环境修改这里)
    // ==========================================

    // 假设你的 K8s 里有一个 Pod 名字叫 "czmhz-service-xxxx"
    // 这里构造一个符合逻辑的 ConsumerGroup
    // 逻辑是:ConsumerGroup前缀 "czmhz" -> 对应 Pod前缀 "czmhz"
    simulatedConsumerGroup := "xunjian_docking_analog"

    // 模拟一个超过阈值的堆积量
    simulatedValue := 180000.0

    fmt.Println("\n🧪 --- 开始模拟告警触发测试 ---")
    fmt.Printf("   模拟消费者组: %s\n", simulatedConsumerGroup)
    fmt.Printf("   模拟堆积量:   %.0f\n", simulatedValue)

    // 3. 测试前缀提取
    prefix := aliyun.ExtractPrefix(simulatedConsumerGroup)
    fmt.Printf("   提取前缀:     %s\n", prefix)
    if prefix == "" {
        log.Fatalf("❌ 前缀提取失败,程序终止")
    }

    // 4. 测试 K8s Pod 重启逻辑
    // 注意:这会真的去重启你 K8s 环境中匹配该前缀的 Pod!
    // 如果你在本地运行,请确保你的 kubeconfig 配置正确且能连接到集群
    fmt.Println("\n🔄 [测试步骤 1/2] 正在尝试连接K8s并重启Pod...")
    err = aliyun.RestartMatchingPods(config, prefix)
    if err != nil {
        fmt.Printf("❌ K8s重启测试失败: %v\n", err)
        fmt.Println("   (请检查: 1.kubeconfig是否正确 2.命名空间是否正确 3.是否有匹配该前缀的Pod)")
    } else {
        fmt.Println("✅ K8s重启测试执行完毕 (请观察K8s集群中Pod是否发生重建)")
    }

    // 5. 测试 钉钉通知发送逻辑
    fmt.Println("\n📨 [测试步骤 2/2] 正在尝试发送钉钉消息...")
    // 这里我们手动调用发送函数
    aliyun.SendDingTalkNotification(config, simulatedConsumerGroup, simulatedValue)

    fmt.Println("\n🏁 测试脚本执行结束")
}

main.go

package main

import (
    "fmt"
    "kafka-monitor/aliyun"
    "log"
    "os"
    "strings"
)

func main() {
    // 检查配置文件参数,确保提供了配置文件路径。
    if len(os.Args) < 2 {
        fmt.Printf("使用方法: %s <配置文件路径>\n", os.Args[0])
        os.Exit(1)
    }

    configPath := os.Args[1]

    // 从命令行参数获取配置文件路径并加载配置。
    config, err := aliyun.LoadConfig(configPath)
    if err != nil {
        log.Fatalf("❌ 加载配置文件失败: %v", err)
    }

    fmt.Println("🔗 开始连接阿里云监控服务...")

    // 获取告警状态
    status, err := aliyun.GetAlarmStatus(
        config.Aliyun.AccessKeyID,
        config.Aliyun.AccessKeySecret,
        config.Aliyun.RegionID,
        config.Aliyun.AlarmRuleID,
    )
    if err != nil {
        log.Fatalf("❌ 获取告警状态失败: %v", err)
    }

    fmt.Println("✅ 阿里云监控连接成功!")
    fmt.Printf("\n📊 告警规则状态:\n")
    fmt.Printf("   规则ID: %s\n", status.RuleID)
    fmt.Printf("   状态: %s\n", status.Status)

    if status.Status != aliyun.StatusAlarm {
        fmt.Println("✅ 当前状态正常 (OK),未拉取详细监控数据。")
        return
    }
    // 只有当状态是 ALARM 时,再去获取监控数据
    fmt.Println("⚠️  云监控触发报警,正在分析详细数据...")

    targetSuffix := "docking_analog" // 目标后缀
    targetThreshold := 150000.0      // 目标堆积阈值

    foundProblem := false

    if len(status.MetricData) == 0 {
        fmt.Println("⚠️  状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
    }

    // 检查告警阈值
    for _, metric := range status.MetricData {
        // 条件:ConsumerGroup 以 docking_analog 结尾且堆积量超过阈值。
        if strings.HasSuffix(metric.ConsumerGroup, targetSuffix) && metric.Value >= targetThreshold {
            foundProblem = true

            fmt.Println("\n----------------------------------------")
            fmt.Printf("🚨 发现异常ConsumerGroup: %s\n", metric.ConsumerGroup)
            fmt.Printf("   当前堆积: %.0f\n", metric.Value)

            // 提取前缀
            prefix := aliyun.ExtractPrefix(metric.ConsumerGroup)

            // 打印信息
            fmt.Println("----------------------------------------")
            fmt.Printf("🚨 [严重堆积] 发现目标消费者组!\n")
            fmt.Printf("   消费者组: %s\n", metric.ConsumerGroup)
            fmt.Printf("   前缀提取: %s\n", prefix)
            fmt.Printf("   当前堆积: %.0f (阈值: %.0f)\n", metric.Value, targetThreshold)
            fmt.Printf("   实例ID:   %s\n", metric.InstanceID)
            fmt.Println("----------------------------------------")

            fmt.Println("   🔄 正在尝试重启关联Pod...")
            err := aliyun.RestartMatchingPods(config, prefix)
            if err != nil {
                log.Printf("   ❌ 重启Pod失败: %v\n", err)
                // 即使重启失败,可能也需要发钉钉通知告知失败,或者继续执行
            }

            // C. 发送钉钉通知
            fmt.Println("   📨 正在发送钉钉通知...")
            aliyun.SendDingTalkNotification(config, metric.ConsumerGroup, metric.Value)

            fmt.Println("----------------------------------------")
        }
    }

    if !foundProblem && len(status.MetricData) > 0 {
        fmt.Printf("ℹ️  虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。\n", targetSuffix, targetThreshold)
    }
}

config.json

{
  "aliyun": {
    "accessKeyId": "your-access-key-id",
    "accessKeySecret": "your-access-key-secret",
    "regionId": "cn-hangzhou",
    "alarmRuleId": "your-alarm-rule-id"
  },
  "k8s": {
    "namespace": "default"
  },
  "dingtalk": {
    "webhookUrl": "xxx",
    "secret": "your-sign-secret"
  }
}

README.md

# Kafka Monitor

Kafka监控工具,用于监控阿里云Kafka消费者组的消息堆积情况,并在满足条件时自动重启相关的K8s Pod并发送钉钉通知。

# 功能

1. 监控阿里云Kafka消费者组的消息堆积量
2. 当消息堆积量超过阈值(150000)且消费者组以`docking_analog`结尾时,自动提取consumerGroup前缀
3. 在K8s集群中查找并重启包含该前缀的Pod
4. 发送钉钉通知告知操作结果

# 配置

在使用之前,需要修改`config.json`文件中的配置信息:

{
  "aliyun": {
    "accessKeyId": "your-access-key-id",
    "accessKeySecret": "your-access-key-secret",
    "regionId": "cn-hangzhou",
    "alarmRuleId": "your-alarm-rule-id"
  },
  "k8s": {
    "namespace": "default"
  },
  "dingtalk": {
    "webhookUrl": "xxx",
    "secret": "your-sign-secret"
  }
}

# 使用方法

1. 确保已经安装Go环境(1.23版本)
2. 确保K8s集群的kubeconfig文件位于`~/.kube/config`
3. 修改`config.json`中的配置信息
4. 编译并运行程序:

   go build -o kafka-monitor main.go
   ./kafka-monitor config.json

# 测试脚本

项目提供了一个测试脚本用于验证K8s Pod重启和钉钉通知功能:

   go run test_trigger.go config.json

# 工作原理

1. 程序启动后会读取配置文件
2. 调用阿里云CMS API获取指定告警规则的状态
3. 如果告警状态为"ALARM"且消息堆积量超过150000,则:
   - 从consumerGroup中提取第一个下划线前的前缀
   - 在K8s集群的指定命名空间中查找包含该前缀的Pod
   - 删除这些Pod,让K8s自动重建
   - 发送钉钉通知告知操作完成

# 注意事项

- 确保阿里云访问密钥具有CMS API的访问权限
- 确保K8s配置文件具有足够的权限来列出和删除Pod
- 钉钉机器人需要配置加签验证

测试

参见上面的README.md文件,构建二进制包:

go build -o kafka-monitor main.go
./kafka-monitor config.json

file

使用test_trigger.go测试pod重启和消息发送:

file

file

file

更新

修改main.go,支持指定日志路径和单次执行或者指定时间间隔执行。

package main

import (
    "flag"
    "fmt"
    "kafka-monitor/aliyun"
    "log"
    "os"
    "strings"
    "time"
)

func main() {
    // 定义命令行参数
    configPath := flag.String("config", "", "配置文件路径")
    logPath := flag.String("log", "", "日志文件路径 (可选)")
    interval := flag.Int("interval", 0, "检查间隔(分钟),0表示单次执行 (可选)")
    flag.Parse()

    // 如果没有通过flag指定config,则检查位置参数
    if *configPath == "" {
        if len(os.Args) < 2 {
            fmt.Printf("使用方法: %s -config=<配置文件路径> [-log=<日志文件路径>] [-interval=<检查间隔(分钟)>]\n", os.Args[0])
            fmt.Printf("或者: %s <配置文件路径>\n", os.Args[0])
            os.Exit(1)
        }
        // 兼容旧的命令行参数方式
        configPath = &os.Args[1]
    }

    // 如果指定了日志文件路径,则设置日志输出到文件
    if *logPath != "" {
        logFile, err := os.OpenFile(*logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
        if err != nil {
            log.Fatal("无法打开日志文件:", err)
        }
        defer logFile.Close()
        log.SetOutput(logFile)
    }

    // 如果指定了检查间隔,则启用定时模式
    if *interval > 0 {
        ticker := time.NewTicker(time.Duration(*interval) * time.Minute)
        defer ticker.Stop()

        fmt.Printf("⏰ 定时监控任务已启动,每%d分钟检查一次告警情况...\n", *interval)

        for {
            fmt.Println("\n🔍 开始检查告警情况...")
            checkAndHandleAlarms(*configPath)
            <-ticker.C
        }
    } else {
        // 单次执行模式
        checkAndHandleAlarms(*configPath)
    }
}

func checkAndHandleAlarms(configPath string) {
    // 记录开始时间
    startTime := time.Now()
    log.Printf("[INFO] 开始检查告警情况,时间: %v", startTime)

    // 加载配置
    config, err := aliyun.LoadConfig(configPath)
    if err != nil {
        log.Printf("[ERROR] 加载配置文件失败: %v", err)
        return
    }

    fmt.Println("🔗 开始连接阿里云监控服务...")

    // 获取告警状态
    status, err := aliyun.GetAlarmStatus(
        config.Aliyun.AccessKeyID,
        config.Aliyun.AccessKeySecret,
        config.Aliyun.RegionID,
        config.Aliyun.AlarmRuleID,
    )
    if err != nil {
        log.Printf("[ERROR] 获取告警状态失败: %v", err)
        return
    }

    fmt.Println("✅ 阿里云监控连接成功!")
    fmt.Printf("\n📊 告警规则状态:\n")
    fmt.Printf("   规则ID: %s\n", status.RuleID)
    fmt.Printf("   状态: %s\n", status.Status)

    if status.Status != aliyun.StatusAlarm {
        fmt.Println("✅ 当前状态正常 (OK),未触发告警。")
        log.Printf("[INFO] 当前状态正常 (OK),未触发告警")
        return
    }

    // 只有当状态是 ALARM 时,才处理告警
    fmt.Println("⚠️  云监控触发报警,正在分析详细数据...")
    log.Printf("[WARN] 云监控触发报警,正在分析详细数据...")

    targetSuffix := "docking_analog" // 目标后缀
    targetThreshold := 150000.0      // 目标堆积阈值

    foundProblem := false

    if len(status.MetricData) == 0 {
        fmt.Println("⚠️  状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
        log.Printf("[WARN] 状态为 ALARM 但未获取到监控数据 (可能是数据延迟)")
    }

    // 检查告警阈值
    for _, metric := range status.MetricData {
        // 条件:ConsumerGroup 以 docking_analog 结尾且堆积量超过阈值。
        if strings.HasSuffix(metric.ConsumerGroup, targetSuffix) && metric.Value >= targetThreshold {
            foundProblem = true

            fmt.Println("\n----------------------------------------")
            fmt.Printf("🚨 发现异常ConsumerGroup: %s\n", metric.ConsumerGroup)
            fmt.Printf("   当前堆积: %.0f\n", metric.Value)

            // 提取前缀
            prefix := aliyun.ExtractPrefix(metric.ConsumerGroup)

            // 打印信息
            fmt.Println("----------------------------------------")
            fmt.Printf("🚨 [严重堆积] 发现目标消费者组!\n")
            fmt.Printf("   消费者组: %s\n", metric.ConsumerGroup)
            fmt.Printf("   前缀提取: %s\n", prefix)
            fmt.Printf("   当前堆积: %.0f (阈值: %.0f)\n", metric.Value, targetThreshold)
            fmt.Printf("   实例ID:   %s\n", metric.InstanceID)
            fmt.Println("----------------------------------------")

            fmt.Println("   🔄 正在尝试重启关联Pod...")
            err := aliyun.RestartMatchingPods(config, prefix)
            if err != nil {
                log.Printf("   ❌ 重启Pod失败: %v", err)
                // 即使重启失败,可能也需要发钉钉通知告知失败,或者继续执行
            }

            // 发送钉钉通知
            fmt.Println("   📨 正在发送钉钉通知...")
            aliyun.SendDingTalkNotification(config, metric.ConsumerGroup, metric.Value)

            fmt.Println("----------------------------------------")
        }
    }

    if !foundProblem && len(status.MetricData) > 0 {
        fmt.Printf("ℹ️  虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。\n", targetSuffix, targetThreshold)
        log.Printf("[INFO] 虽有报警,但未发现后缀为 '%s' 且堆积量超过 %.0f 的组。", targetSuffix, targetThreshold)
    }

    // 记录结束时间
    endTime := time.Now()
    duration := endTime.Sub(startTime)
    fmt.Println("✅ 告警检查完成")
    log.Printf("[INFO] 告警检查完成,耗时: %v", duration)
}

配置定时任务,每十分钟检查一次告警状态,日志输出到/var/log/kafka-monitor.log,每3天清空/var/log/kafka-monitor.log日志。

*/10 * * * * /root/kafka-monitor -config=config.json -log=/var/log/kafka-monitor.log
0 0 */3 * * > /var/log/kafka-monitor.log

file

分类: k8s
0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x