背景
集群上的服务已经部署好了,现需要一套测试环境,但还用原来的机器,即生产和测试一套集群。那就需要把生产环境和测试环境的服务分开,跑在不同的节点上。服务众多,挨个修改添加nodeSelector
,nodeName
或者添加节点亲和性工作量太大了。
试验过使用PodNodeSelector
插件来实现。官方文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/admission-controllers/#podnodeselector
首先在每个master上启用插件:
vim /etc/kubernetes/manifests/kube-apiserver.yaml
- --enable-admission-plugins=NodeRestriction,PodNodeSelector
master重启后,给测试空间添加annotations
:
k edit ns test
annotations:
scheduler.alpha.kubernetes.io/node-selector: env=test
然后给node添加标签:
k label node kube-node01 env=test
这样原test空间下的pod重启后和test空间下新建的pod都会调度到kube-node01
上,pod会多出来一个标签env=test
。
k describe po -n test nginx-68dc7b7d54-qvwhq | grep env
Node-Selectors: env=test
但是实验下来发现有个问题:做了标签的ns下的pod新建都会调度到指定节点,但是不能保证其他ns下的pod不调度到这些节点上,也就是说default空间下的pod也可能会调度到kube-node01
上,这样就不满足需求了。
解决
配置nodeName
原理是配置pod的nodeName字段,来让带有对应env标签的pod随机调度到符合标签的节点上,如果标签都不符合则在剩下的node节点非master节点中随机调度。
下面是调度器的代码,构建打包成镜像然后在deployment中应用,但是调度失败了。因为在pod调度后pod的nodeName
字段不允许修改,需要在pod调度前配置,我这里是调度后了,所以得用自定义调度器。
自定义调度器的话需要配置KubeSchedulerConfiguration
,然后代码里实现启用的调度插件。然后pod的spec.schedulerName
字段必须匹配KubeSchedulerProfile
中的schedulerName
字段。暂未实现。
https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
https://kubernetes.io/zh-cn/docs/reference/scheduling/config/
https://github.com/kubernetes/kubernetes/tree/master/pkg/scheduler
package main
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"math/rand"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
config, err = rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
for {
schedulePods(clientset)
time.Sleep(1 * time.Second)
}
}
func schedulePods(clientset *kubernetes.Clientset) {
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=",
LabelSelector: "env=test",
})
if err != nil {
panic(err.Error())
}
for _, pod := range pods.Items {
// 获取所有节点
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
// 根据Pod的标签选择匹配的节点
var matchingNodes []corev1.Node
for _, node := range nodes.Items {
if node.Labels["env"] == pod.Labels["env"] {
matchingNodes = append(matchingNodes, node)
}
}
// 如果有匹配的节点,从中随机选择一个
if len(matchingNodes) > 0 {
chosenNode := matchingNodes[rand.Intn(len(matchingNodes))]
fmt.Printf("Scheduling pod %s to node %s\n", pod.Name, chosenNode.Name)
patchPodToNode(clientset, pod.Name, pod.Namespace, chosenNode.Name)
continue
}
// 如果没有匹配的节点,从非master节点中随机选择一个
var nonMasterNodes []corev1.Node
for _, node := range nodes.Items {
if _, isMaster := node.Labels["node-role.kubernetes.io/master"]; !isMaster {
nonMasterNodes = append(nonMasterNodes, node)
}
}
if len(nonMasterNodes) > 0 {
chosenNode := nonMasterNodes[rand.Intn(len(nonMasterNodes))]
fmt.Printf("Scheduling pod %s to non-master node %s\n", pod.Name, chosenNode.Name)
patchPodToNode(clientset, pod.Name, pod.Namespace, chosenNode.Name)
}
}
}
func patchPodToNode(clientset *kubernetes.Clientset, podName, namespace, nodeName string) {
patch := []byte(fmt.Sprintf(`{"spec": {"nodeName": "%s"}}`, nodeName))
_, err := clientset.CoreV1().Pods(namespace).Patch(context.TODO(), podName, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
fmt.Printf("Failed to schedule pod %s: %v\n", podName, err)
}
}
还有所需的RBAC;
apiVersion: v1
kind: ServiceAccount
metadata:
name: custom-scheduler-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: custom-scheduler-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["list", "watch", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: custom-scheduler-rolebinding
subjects:
- kind: ServiceAccount
name: custom-scheduler-sa
roleRef:
kind: Role
name: custom-scheduler-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: custom-scheduler-clusterrole
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["list", "watch", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: custom-scheduler-clusterrolebinding
subjects:
- kind: ServiceAccount
name: custom-scheduler-sa
namespace: default
roleRef:
kind: ClusterRole
name: custom-scheduler-clusterrole
apiGroup: rbac.authorization.k8s.io
Mutatingwebhook
还有一种方法使用mutatingwebhook
来修改nodeSelector
。pod在调度的过程中,请求到达apiserver
然后就会调用mutatingwebhook
,这也就可以在kube-scheduler
调度前对pod进行修改,使得符合需求。下面是具体步骤:
首先需要编写webhook的代码:
package main
import (
"encoding/json"
"fmt"
"io"
admissionv1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"log"
"net/http"
)
func handleMutate(w http.ResponseWriter, r *http.Request) {
// 读取webhook的请求体
body, err := io.ReadAll(r.Body)
if err != nil {
// 返回500错误
http.Error(w, "could not read request body", http.StatusInternalServerError)
fmt.Printf("could not read request body: %v\n", err)
return
}
fmt.Println("body:", string(body))
var admissionReview admissionv1.AdmissionReview
// 将请求体解析成AdmissionReview对象
if err := json.Unmarshal(body, &admissionReview); err != nil {
// 返回400错误
http.Error(w, "could not unmarshal request body", http.StatusBadRequest)
fmt.Printf("could not unmarshal request body: %v\n", err)
return
}
deployment := appsv1.Deployment{}
// 从AdmissionReview对象中获取deployment对象
if err := json.Unmarshal(admissionReview.Request.Object.Raw, &deployment); err != nil {
// 返回400错误
http.Error(w, "could not unmarshal deployment object", http.StatusBadRequest)
fmt.Printf("could not unmarshal deployment object: %v\n", err)
return
}
// 获取deployment中的env label
env := deployment.Spec.Template.Labels["env"]
patch := ""
// 根据env label设置affinity
if env != "" {
patch = getAffinityPatch()
} else {
patch = getNonMasterNodePatch()
}
fmt.Println("patch:", patch)
// 构造AdmissionResponse对象,并将生成的patch添加到响应中
admissionResponse := admissionv1.AdmissionResponse{
UID: admissionReview.Request.UID,
Allowed: true,
Patch: []byte(patch),
PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(),
}
// 设置AdmissionReview对象的响应为admissionResponse
admissionReview.Response = &admissionResponse
// 将AdmissionReview对象序列化为JSON格式
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, "could not marshal response", http.StatusInternalServerError)
fmt.Printf("could not marshal response: %v\n", err)
return
}
fmt.Println("response:", string(respBytes))
// 设置响应头
w.Header().Set("Content-Type", "application/json")
// 将响应写入响应体
w.Write(respBytes)
}
// 生成NodeAffinity patch,让pod调度到env=test的节点上
func getAffinityPatch() string {
affinity := corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "env",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"test"},
},
},
},
},
},
},
}
// 将affinity对象序列化为JSON格式
affinityBytes, _ := json.Marshal(affinity)
// 返回patch
return fmt.Sprintf(`[{"op":"add","path":"/spec/template/spec/affinity","value":%s}]`, string(affinityBytes))
}
// 生成NodeAffinity patch,让pod优先调度到非master并且env!=test的节点上
func getNonMasterNodePatch() string {
affinity := corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{
{
Weight: 1,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-role.kubernetes.io/master",
Operator: corev1.NodeSelectorOpDoesNotExist,
},
{
Key: "env",
Operator: corev1.NodeSelectorOpNotIn,
Values: []string{"test"},
},
},
},
},
},
},
}
affinityBytes, _ := json.Marshal(affinity)
return fmt.Sprintf(`[{"op":"add","path":"/spec/template/spec/affinity","value":%s}]`, string(affinityBytes))
}
func main() {
// 将handleMutate函数作为webhook的处理函数,请求路径是/mutate
http.HandleFunc("/mutate", handleMutate)
fmt.Println("Listening on port 443...")
// 监听443端口,使用TLS证书和密钥
log.Fatal(http.ListenAndServeTLS(":443", "tls.crt", "tls.key", nil))
}
因为webhook需要使用tls证书,手动生成太麻烦了,所以使用cert-mananger来自动注入。文档参考:https://deploy-preview-408--cert-manager.netlify.app/docs/concepts/ca-injector/
创建配置文件conf.yaml
,启用自动注入。在创建或更新deployment时应用webhook。
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-webhook
annotations:
cert-manager.io/inject-ca-from: test/webhook-cert
webhooks:
- name: mutate.webhook.com
clientConfig:
service:
name: webhook-service
namespace: test
path: "/mutate"
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["apps"]
apiVersions: ["v1"]
resources: ["deployments"]
admissionReviewVersions: ["v1"]
sideEffects: None
timeoutSeconds: 5
---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: webhook-cert
spec:
secretName: webhook-tls
dnsNames:
- webhook-service.test.svc
issuerRef:
name: selfsigned-cluster-issuer
kind: Issuer
---
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: selfsigned-cluster-issuer
spec:
selfSigned: {}
k apply -f conf.yaml -n test
创建后生成一个secret,把secret中的crt和key导出。
k get secret webhook-tls -o jsonpath='{.data.tls\.crt}' -n test | base64 --decode > tls.crt
k get secret webhook-tls -o jsonpath='{.data.tls\.key}' -n test | base64 --decode > tls.key
创建Dockerfile。
FROM registry.cn-hangzhou.aliyuncs.com/wgh9626/alpine:latest
WORKDIR /app
COPY . .
ENTRYPOINT ["./webhook-server"]
构建镜像。
nerdctl build -t webhook-server:5.0 .
导出并拷贝镜像到node节点,或者推送到镜像仓库。
nerdctl save -o webhook-server.tar webhook-server:5.0
scp webhook-server.tar kube-node01:/root
ctr -n k8s.io i import webhook-server.tar
先删除MutatingWebhookConfiguration
,否则创建webhook-server
会报错。
k delete MutatingWebhookConfiguration mutating-webhook
创建deploy.yaml
文件,启动webhook-server
。
apiVersion: apps/v1
kind: Deployment
metadata:
name: webhook-server
labels:
app: webhook-server
spec:
replicas: 1
selector:
matchLabels:
app: webhook-server
template:
metadata:
labels:
app: webhook-server
spec:
containers:
- name: webhook-server
image: webhook-server:5.0
ports:
- containerPort: 443
---
apiVersion: v1
kind: Service
metadata:
name: webhook-service
spec:
ports:
- port: 443
targetPort: 443
selector:
app: webhook-server
k apply -f deploy.yaml -n test
再次创建MutatingWebhookConfiguration
。
k apply -f conf.yaml -n test
给节点打标签。
k label node kube-node01 env=test
k label node kube-node02 env=test
测试
创建noenv.yaml
,测试没有env标签的deployment。
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-noenv
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:stable-alpine
k apply -f noenv.yaml
可以看到pod调度到了kube-node03
节点上,没有env=test标签且非maser。
patch和response如下:
创建env.yaml,测试带有标签的deployment。
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
env: test
template:
metadata:
labels:
app: nginx
env: test
spec:
containers:
- name: nginx
image: nginx:stable-alpine
k apply -f noenv.yaml -n test
可以看到pod调度到了kube-node02
节点上,带有env=test标签。
查看deployment中已经携带了nodeAffinity
。
原服务修改一下deployment的name让服务重新调度即可,至此已实现上面的需求。