go, k8s

发送hpa扩容通知消息到钉钉群

步骤

我的环境k8s版本为v1.20.11,hpa的api版本没有autoscaling/v2,配置behavior需要使用autoscaling/v2beta2。

file

部署测试服务

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
        - name: nginx
          image: nginx:latest
          imagePullPolicy: IfNotPresent
          resources:
            requests:
              cpu: 200m
            limits:
              cpu: 500m
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: nginx
  name: nginx
spec:
  ports:
  - name: 80-80
    port: 80
    protocol: TCP
    targetPort: 80
  selector:
    app: nginx
  type: ClusterIP

创建hpa

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: nginx-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: nginx
  minReplicas: 1   # 最小副本数
  maxReplicas: 5   # 最大副本数
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 30  # 扩容决策稳定窗口,这意味着 HPA 在决定扩容时,会参考过去30s的指标数据,允许快速响应负载增加。
      policies:
      - type: Pods
        value: 2       # 每次扩容最多增加 2 个副本
        periodSeconds: 30
    scaleDown: # 禁用自动缩容
      #stabilizationWindowSeconds: 3600
      #policies:
      #- type: Pods
      #  value: 1
      #  periodSeconds: 1800
      selectPolicy: Disabled
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 80

创建压测pod

kubectl run -it --rm load-generator --image=busybox --restart=Never -- /bin/sh -c "while true; do wget -q -O- http://nginx-service; done"

观察扩容

watch kubectl get hpa,pods

file

pod没有被自动删除。

file

扩容时和扩容完成时发送消息到钉钉。代码用go实现,文档地址:https://pkg.go.dev/k8s.io/api/autoscaling/v2@v0.32.3

package main

import (
    "bytes"
    "context"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "k8s.io/client-go/tools/clientcmd"
    "net/http"
    "net/url"
    "time"

    autoscalingv2 "k8s.io/api/autoscaling/v2"
    "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
)

const (
    dingdingWebhook = "https://oapi.dingtalk.com/robot/send?access_token=5e53ec。。。"
    dingdingSecret = "SEC6d95dc。。。"
)

var (
    lastScaleEvent = make(map[string]time.Time) // 用于事件去重
)

// 钉钉消息结构
type DingdingMsg struct {
    Msgtype string `json:"msgtype"`
    Text    struct {
        Content string `json:"content"`
    } `json:"text"`
}

func main() {
    var kubeConfig = "/root/.kube/config"
    // 初始化 Kubernetes 客户端
    config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
    if err != nil {
        panic(err.Error())
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 监听所有命名空间的 HPA 事件
    hpaWatcher, err := clientset.AutoscalingV2().HorizontalPodAutoscalers("").Watch(context.TODO(), v1.ListOptions{})
    if err != nil {
        panic(err.Error())
    }

    // 事件处理循环
    for {
        select {
        case event := <-hpaWatcher.ResultChan():
            if event.Object == nil {
                continue
            }

            hpa, ok := event.Object.(*autoscalingv2.HorizontalPodAutoscaler)
            if !ok {
                continue
            }

            handleHPAEvent(hpa)

        case <-time.After(30 * time.Second):
            // 定期清理旧事件
            cleanupOldEvents()
        }
    }
}

func handleHPAEvent(hpa *autoscalingv2.HorizontalPodAutoscaler) {
    key := fmt.Sprintf("%s/%s", hpa.Namespace, hpa.Name)

    // 只处理更新时间戳在 30 秒内的事件
    if time.Since(lastScaleEvent[key]) < 30*time.Second {
        return
    }

    // 检查扩容条件
    currentReplicas := hpa.Status.CurrentReplicas
    desiredReplicas := hpa.Status.DesiredReplicas

    if desiredReplicas > currentReplicas {
        msg := fmt.Sprintf("HPA 扩容触发:\n"+
            "服务: %s/%s\n"+
            "当前副本数: %d\n"+
            "目标副本数: %d\n"+
            "触发条件: %s\n"+
            "%s\n"+
            "时间: %s",
            hpa.Namespace, hpa.Name,
            currentReplicas,
            desiredReplicas,
            getTriggerConditions(hpa),
            getMetricsDetails(hpa),
            time.Now().Format("2006-01-02 15:04:05"))

        //fmt.Println("HPA检测情况:", msg)
        sendDingdingMessage(msg, "hpa扩容开始")

        // 启动异步检查扩容完成状态
        go checkScalingCompletion(hpa)
        lastScaleEvent[key] = time.Now()
    }
}

func getMetricsDetails(hpa *autoscalingv2.HorizontalPodAutoscaler) string {
    var details []string
    for _, metric := range hpa.Spec.Metrics {
        if metric.Type == autoscalingv2.ResourceMetricSourceType && metric.Resource != nil {
            metricName := metric.Resource.Name
            targetValue := ""

            // 处理目标值
            if metric.Resource.Target.AverageUtilization != nil {
                targetValue = fmt.Sprintf("%d%%", *metric.Resource.Target.AverageUtilization)
            } else if metric.Resource.Target.AverageValue != nil {
                targetValue = metric.Resource.Target.AverageValue.String()
            }

            currentValue := ""
            // 处理当前值
            if len(hpa.Status.CurrentMetrics) > 0 {
                for _, currentMetric := range hpa.Status.CurrentMetrics {
                    if currentMetric.Resource != nil && currentMetric.Resource.Name == metricName {
                        if currentMetric.Resource.Current.AverageUtilization != nil {
                            currentValue = fmt.Sprintf("%d%%", *currentMetric.Resource.Current.AverageUtilization)
                        } else if currentMetric.Resource.Current.AverageValue != nil {
                            currentValue = currentMetric.Resource.Current.AverageValue.String()
                        }
                    }
                }
            }

            details = append(details, fmt.Sprintf("资源指标: %s, 当前值: %s, 目标值: %s", metricName, currentValue, targetValue))
        }
    }

    if len(details) > 0 {
        return fmt.Sprintf("指标详情: [%s]", stringJoin(details, ", "))
    }
    return "没有可用的指标详情"
}

func stringJoin(items []string, separator string) string {
    result := ""
    for i, item := range items {
        if i > 0 {
            result += separator
        }
        result += item
    }
    return result
    //return strings.Join(items, separator)
}

func getTriggerConditions(hpa *autoscalingv2.HorizontalPodAutoscaler) string {
    var conditions []string
    for _, condition := range hpa.Status.Conditions {
        if condition.Type == autoscalingv2.ScalingActive {
            conditions = append(conditions, condition.Message)
        }
    }
    return fmt.Sprintf("%v", conditions)
}

func checkScalingCompletion(hpa *autoscalingv2.HorizontalPodAutoscaler) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        currentHPA, err := getLatestHPAStatus(hpa.Namespace, hpa.Name)
        if err != nil {
            continue
        }

        if currentHPA.Status.CurrentReplicas == currentHPA.Status.DesiredReplicas {
            msg := fmt.Sprintf("HPA 扩容完成:\n"+
                "服务: %s/%s\n"+
                "当前副本数: %d\n"+
                "时间: %s",
                hpa.Namespace, hpa.Name,
                currentHPA.Status.CurrentReplicas,
                time.Now().Format("2006-01-02 15:04:05"))

            //fmt.Println("HPA检测情况:", msg)
            sendDingdingMessage(msg, "hpa扩容完成")
            return
        }
    }
}

func getLatestHPAStatus(namespace, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
    var kubeConfig = "/root/.kube/config"
    // 初始化 Kubernetes 客户端
    config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
    if err != nil {
        panic(err.Error())
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }
    return clientset.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get(context.TODO(), name, v1.GetOptions{})
}

func sendDingdingMessage(content string, eventType string) {
    // 生成签名
    timestamp := time.Now().UnixNano() / 1e6
    sign := generateSign(timestamp)

    // 构建请求 URL
    webhookURL := fmt.Sprintf("%s&timestamp=%d&sign=%s", dingdingWebhook, timestamp, sign)

    // 构建消息体
    msg := DingdingMsg{
        Msgtype: "text",
        Text: struct {
            Content string `json:"content"`
        }{
            Content: content + "\n事件类型: " + eventType,
        },
    }

    jsonBody, _ := json.Marshal(msg)
    fmt.Println("发送钉钉消息:", msg.Text.Content)
    resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(jsonBody))
    if err != nil {
        fmt.Println("发送钉钉消息失败:", err)
        return
    }
    defer resp.Body.Close()
}

func generateSign(timestamp int64) string {
    stringToSign := fmt.Sprintf("%d\n%s", timestamp, dingdingSecret)
    h := hmac.New(sha256.New, []byte(dingdingSecret))
    h.Write([]byte(stringToSign))
    return url.QueryEscape(base64.StdEncoding.EncodeToString(h.Sum(nil)))
}

func cleanupOldEvents() {
    now := time.Now()
    for k, t := range lastScaleEvent {
        if now.Sub(t) > 5*time.Minute {
            delete(lastScaleEvent, k)
        }
    }
}

消息如下:

file

构建镜像

FROM golang:1.23 AS builder
WORKDIR /app
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o hpa-watcher .

FROM alpine:3.18
COPY --from=builder /app/hpa-watcher /app/
CMD ["/app/hpa-watcher"]

部署到k8s

apiVersion: apps/v1
kind: Deployment
metadata:
  name: hpa-watcher
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hpa-watcher
  template:
    metadata:
      labels:
        app: hpa-watcher
    spec:
      serviceAccountName: hpa-watcher
      containers:
      - name: watcher
        image: your-registry/hpa-watcher:v1
        resources:
          limits:
            memory: "128Mi"
            cpu: "100m"

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: hpa-watcher

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: hpa-watcher
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: hpa-watcher
subjects:
- kind: ServiceAccount
  name: hpa-watcher
  namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: hpa-watcher
rules:
- apiGroups: ["autoscaling"]
  resources: ["horizontalpodautoscalers"]
  verbs: ["get", "list", "watch"]
0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x