go, k8s

根据标签调度pod

背景

集群上的服务已经部署好了,现需要一套测试环境,但还用原来的机器,即生产和测试一套集群。那就需要把生产环境和测试环境的服务分开,跑在不同的节点上。服务众多,挨个修改添加nodeSelectornodeName或者添加节点亲和性工作量太大了。

试验过使用PodNodeSelector 插件来实现。官方文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/admission-controllers/#podnodeselector
首先在每个master上启用插件:

vim /etc/kubernetes/manifests/kube-apiserver.yaml
- --enable-admission-plugins=NodeRestriction,PodNodeSelector

master重启后,给测试空间添加annotations

k edit ns test
annotations:
  scheduler.alpha.kubernetes.io/node-selector: env=test

然后给node添加标签:

k label node kube-node01 env=test

这样原test空间下的pod重启后和test空间下新建的pod都会调度到kube-node01上,pod会多出来一个标签env=test

k describe po -n test nginx-68dc7b7d54-qvwhq | grep env
Node-Selectors:              env=test

但是实验下来发现有个问题:做了标签的ns下的pod新建都会调度到指定节点,但是不能保证其他ns下的pod不调度到这些节点上,也就是说default空间下的pod也可能会调度到kube-node01上,这样就不满足需求了。

解决

Custom scheduler

所以这里需要自定义一个调度器,来让带有对应env标签的pod随机调度到符合标签的节点上,如果标签都不符合则在剩下的node节点非master节点中随机调度。

我尝试写了一个调度器,然后在deployment中应用,但是调度失败了。因为pod的nodeName字段不允许修改。下面是调度器的代码。

package main

import (
        "context"
        "fmt"
        "k8s.io/apimachinery/pkg/types"
        "math/rand"
        "time"

        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/clientcmd"
)

func main() {
        config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
        if err != nil {
                config, err = rest.InClusterConfig()
                if err != nil {
                        panic(err.Error())
                }
        }

        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
                panic(err.Error())
        }

        for {
                schedulePods(clientset)
                time.Sleep(1 * time.Second)
        }
}

func schedulePods(clientset *kubernetes.Clientset) {
        pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
                FieldSelector: "spec.nodeName=",
                LabelSelector: "env=test",
        })
        if err != nil {
                panic(err.Error())
        }

        for _, pod := range pods.Items {
                // 获取所有节点
                nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
                if err != nil {
                        panic(err.Error())
                }

                // 根据Pod的标签选择匹配的节点
                var matchingNodes []corev1.Node
                for _, node := range nodes.Items {
                        if node.Labels["env"] == pod.Labels["env"] {
                                matchingNodes = append(matchingNodes, node)
                        }
                }

                // 如果有匹配的节点,从中随机选择一个
                if len(matchingNodes) > 0 {
                        chosenNode := matchingNodes[rand.Intn(len(matchingNodes))]
                        fmt.Printf("Scheduling pod %s to node %s\n", pod.Name, chosenNode.Name)
                        patchPodToNode(clientset, pod.Name, pod.Namespace, chosenNode.Name)
                        continue
                }

                // 如果没有匹配的节点,从非master节点中随机选择一个
                var nonMasterNodes []corev1.Node
                for _, node := range nodes.Items {
                        if _, isMaster := node.Labels["node-role.kubernetes.io/master"]; !isMaster {
                                nonMasterNodes = append(nonMasterNodes, node)
                        }
                }

                if len(nonMasterNodes) > 0 {
                        chosenNode := nonMasterNodes[rand.Intn(len(nonMasterNodes))]
                        fmt.Printf("Scheduling pod %s to non-master node %s\n", pod.Name, chosenNode.Name)
                        patchPodToNode(clientset, pod.Name, pod.Namespace, chosenNode.Name)
                }
        }
}

func patchPodToNode(clientset *kubernetes.Clientset, podName, namespace, nodeName string) {
        patch := []byte(fmt.Sprintf(`{"spec": {"nodeName": "%s"}}`, nodeName))
        _, err := clientset.CoreV1().Pods(namespace).Patch(context.TODO(), podName, types.MergePatchType, patch, metav1.PatchOptions{})
        if err != nil {
                fmt.Printf("Failed to schedule pod %s: %v\n", podName, err)
        }
}

还有所需的RBAC;

apiVersion: v1
kind: ServiceAccount
metadata:
  name: custom-scheduler-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: custom-scheduler-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["list", "watch", "patch"]
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: custom-scheduler-rolebinding
subjects:
- kind: ServiceAccount
  name: custom-scheduler-sa
roleRef:
  kind: Role
  name: custom-scheduler-role
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: custom-scheduler-clusterrole
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["list", "watch", "patch"]
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: custom-scheduler-clusterrolebinding
subjects:
- kind: ServiceAccount
  name: custom-scheduler-sa
  namespace: default
roleRef:
  kind: ClusterRole
  name: custom-scheduler-clusterrole
  apiGroup: rbac.authorization.k8s.io

看起来需要在kube-scheduler源码的基础上修改了,这种暂未实现。

Mutatingwebhook

还有一种方法使用mutatingwebhook来修改nodeSelector。pod在调度的过程中,请求到达apiserver然后就会调用mutatingwebhook,这也就可以在kube-scheduler调度前对pod进行修改,使得符合需求。下面是具体步骤:

首先需要编写webhook的代码:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    admissionv1 "k8s.io/api/admission/v1"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "log"
    "net/http"
)

func handleMutate(w http.ResponseWriter, r *http.Request) {
    // 读取webhook的请求体
    body, err := io.ReadAll(r.Body)
    if err != nil {
        // 返回500错误
        http.Error(w, "could not read request body", http.StatusInternalServerError)
        fmt.Printf("could not read request body: %v\n", err)
        return
    }
    fmt.Println("body:", string(body))

    var admissionReview admissionv1.AdmissionReview
    // 将请求体解析成AdmissionReview对象
    if err := json.Unmarshal(body, &admissionReview); err != nil {
        // 返回400错误
        http.Error(w, "could not unmarshal request body", http.StatusBadRequest)
        fmt.Printf("could not unmarshal request body: %v\n", err)
        return
    }

    deployment := appsv1.Deployment{}
    // 从AdmissionReview对象中获取deployment对象
    if err := json.Unmarshal(admissionReview.Request.Object.Raw, &deployment); err != nil {
        // 返回400错误
        http.Error(w, "could not unmarshal deployment object", http.StatusBadRequest)
        fmt.Printf("could not unmarshal deployment object: %v\n", err)
        return
    }
    // 获取deployment中的env label
    env := deployment.Spec.Template.Labels["env"]
    patch := ""
    // 根据env label设置affinity
    if env != "" {
        patch = getAffinityPatch()
    } else {
        patch = getNonMasterNodePatch()
    }

    fmt.Println("patch:", patch)
    // 构造AdmissionResponse对象,并将生成的patch添加到响应中
    admissionResponse := admissionv1.AdmissionResponse{
        UID:       admissionReview.Request.UID,
        Allowed:   true,
        Patch:     []byte(patch),
        PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(),
    }
    // 设置AdmissionReview对象的响应为admissionResponse
    admissionReview.Response = &admissionResponse
    // 将AdmissionReview对象序列化为JSON格式
    respBytes, err := json.Marshal(admissionReview)
    if err != nil {
        http.Error(w, "could not marshal response", http.StatusInternalServerError)
        fmt.Printf("could not marshal response: %v\n", err)
        return
    }
    fmt.Println("response:", string(respBytes))
    // 设置响应头
    w.Header().Set("Content-Type", "application/json")
    // 将响应写入响应体
    w.Write(respBytes)
}

// 生成NodeAffinity patch,让pod调度到env=test的节点上
func getAffinityPatch() string {
    affinity := corev1.Affinity{
        NodeAffinity: &corev1.NodeAffinity{
            RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
                NodeSelectorTerms: []corev1.NodeSelectorTerm{
                    {
                        MatchExpressions: []corev1.NodeSelectorRequirement{
                            {
                                Key:      "env",
                                Operator: corev1.NodeSelectorOpIn,
                                Values:   []string{"test"},
                            },
                        },
                    },
                },
            },
        },
    }
    // 将affinity对象序列化为JSON格式
    affinityBytes, _ := json.Marshal(affinity)
    // 返回patch
    return fmt.Sprintf(`[{"op":"add","path":"/spec/template/spec/affinity","value":%s}]`, string(affinityBytes))
}
// 生成NodeAffinity patch,让pod优先调度到非master并且env!=test的节点上
func getNonMasterNodePatch() string {
    affinity := corev1.Affinity{
        NodeAffinity: &corev1.NodeAffinity{
            PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{
                {
                    Weight: 1,
                    Preference: corev1.NodeSelectorTerm{
                        MatchExpressions: []corev1.NodeSelectorRequirement{
                            {
                                Key:      "node-role.kubernetes.io/master",
                                Operator: corev1.NodeSelectorOpDoesNotExist,
                            },
                            {
                                Key:      "env",
                                Operator: corev1.NodeSelectorOpNotIn,
                                Values:   []string{"test"},
                            },
                        },
                    },
                },
            },
        },
    }
    affinityBytes, _ := json.Marshal(affinity)
    return fmt.Sprintf(`[{"op":"add","path":"/spec/template/spec/affinity","value":%s}]`, string(affinityBytes))
}

func main() {
    // 将handleMutate函数作为webhook的处理函数,请求路径是/mutate
    http.HandleFunc("/mutate", handleMutate)
    fmt.Println("Listening on port 443...")
    // 监听443端口,使用TLS证书和密钥
    log.Fatal(http.ListenAndServeTLS(":443", "tls.crt", "tls.key", nil))
}

因为webhook需要使用tls证书,手动生成太麻烦了,所以使用cert-mananger来自动注入。文档参考:https://deploy-preview-408–cert-manager.netlify.app/docs/concepts/ca-injector/

创建配置文件conf.yaml,启用自动注入。在创建或更新deployment时应用webhook。

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: mutating-webhook
  annotations:
    cert-manager.io/inject-ca-from: test/webhook-cert
webhooks:
  - name: mutate.webhook.com
    clientConfig:
      service:
        name: webhook-service
        namespace: test
        path: "/mutate"
    rules:
      - operations: ["CREATE", "UPDATE"]
        apiGroups: ["apps"]
        apiVersions: ["v1"]
        resources: ["deployments"]
    admissionReviewVersions: ["v1"]
    sideEffects: None
    timeoutSeconds: 5
---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: webhook-cert
spec:
  secretName: webhook-tls
  dnsNames:
    - webhook-service.test.svc
  issuerRef:
    name: selfsigned-cluster-issuer
    kind: Issuer
---
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
  name: selfsigned-cluster-issuer
spec:
  selfSigned: {}
k apply -f conf.yaml -n test

创建后生成一个secret,把secret中的crt和key导出。

k get secret webhook-tls -o jsonpath='{.data.tls\.crt}' -n test | base64 --decode > tls.crt
k get secret webhook-tls -o jsonpath='{.data.tls\.key}' -n test | base64 --decode > tls.key

创建Dockerfile。

FROM registry.cn-hangzhou.aliyuncs.com/wgh9626/alpine:latest
WORKDIR /app
COPY . .
ENTRYPOINT ["./webhook-server"]

构建镜像。

nerdctl build -t webhook-server:5.0 .

file

导出并拷贝镜像到node节点,或者推送到镜像仓库。

nerdctl save -o webhook-server.tar webhook-server:5.0
scp webhook-server.tar kube-node01:/root
ctr -n k8s.io i import webhook-server.tar

先删除MutatingWebhookConfiguration,否则创建webhook-server会报错。

k delete MutatingWebhookConfiguration mutating-webhook

创建deploy.yaml文件,启动webhook-server

apiVersion: apps/v1
kind: Deployment
metadata:
  name: webhook-server
  labels:
    app: webhook-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: webhook-server
  template:
    metadata:
      labels:
        app: webhook-server
    spec:
      containers:
      - name: webhook-server
        image: webhook-server:5.0
        ports:
        - containerPort: 443
---
apiVersion: v1
kind: Service
metadata:
  name: webhook-service
spec:
  ports:
  - port: 443
    targetPort: 443
  selector:
    app: webhook-server
k apply -f deploy.yaml -n test

file

再次创建MutatingWebhookConfiguration

k apply -f conf.yaml -n test

给节点打标签。

k label node kube-node01 env=test
k label node kube-node02 env=test

file

测试

创建noenv.yaml,测试没有env标签的deployment。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-noenv
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:stable-alpine
k apply -f noenv.yaml

可以看到pod调度到了kube-node03节点上,没有env=test标签且非maser。

file

patch和response如下:

file

创建env.yaml,测试带有标签的deployment。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
      env: test
  template:
    metadata:
      labels:
        app: nginx
        env: test
    spec:
      containers:
      - name: nginx
        image: nginx:stable-alpine
k apply -f noenv.yaml -n test

可以看到pod调度到了kube-node02节点上,带有env=test标签。

file

查看deployment中已经携带了nodeAffinity

file

原服务修改一下deployment的name让服务重新调度即可,至此已实现上面的需求。

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x