基本操作包括Create创建,Update更新,Delete删除,List and Watch查询监听。下面分别介绍每个操作的代码实现。
环境:k8s 1.24.16,go 1.20,本地。
目录结构:
连接apiserver
这里使用Clientset作为客户端,把这个封装成一个文件。如果你要在服务器上测试,需要使用环境变量。
clientSet.go
package pvc
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"log"
)
func GetClient() (*kubernetes.Clientset, error) {
var kubeConfig = "E:\\client-go\\config"
//kubeconfig := filepath.Join(
// os.Getenv("HOME"), ".kube", "config",
//)
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
log.Fatal(err)
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
return clientSet, nil
}
Create
部署带有pvc的nginx测试服务。
nginx.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:stable-alpine
volumeMounts:
- name: nginx-storage
mountPath: /usr/share/nginx/html
volumes:
- name: nginx-storage
persistentVolumeClaim:
claimName: nginx-pvc
pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nginx-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
这里不能写在一起,因为clientSet的CoreV1()方法不同,一个是Deployments,一个是PersistentVolumeClaims。
deploy.go
package pvc
import (
"context"
_ "embed"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
"log"
"os"
"strings"
)
func CreatePVC(c *kubernetes.Clientset) {
var pvcYaml = "E:\\client-go\\pvc\\deploy\\pvc.yaml"
bytes, err := os.ReadFile(pvcYaml)
if err != nil {
log.Fatal(err)
}
pvc := corev1.PersistentVolumeClaim{}
err = yaml.Unmarshal(bytes, &pvc)
if err != nil {
log.Fatal(err)
}
_, err = c.CoreV1().PersistentVolumeClaims("default").Create(context.TODO(), &pvc, metav1.CreateOptions{})
if err != nil {
log.Fatal(err)
}
log.Println("create pvc success")
}
//go:embed nginx.yaml
var nginxYaml string
func CreateDeploy(c *kubernetes.Clientset) {
decode := yaml.NewYAMLOrJSONDecoder(strings.NewReader(nginxYaml), 100)
deplyment := appsv1.Deployment{}
err := decode.Decode(&deplyment)
if err != nil {
log.Fatal(err)
}
_, err = c.AppsV1().Deployments("default").Create(context.TODO(), &deplyment, metav1.CreateOptions{})
if err != nil {
log.Fatal(err)
}
log.Println("create deployment success")
}
通过//go:embed
,可以方便地将文件内容嵌入到Go代码中,而无需在运行时读取外部文件。适用于go 1.16+。
这里使用了两种解析yaml文件的方法:一种是使用yaml.Unmarshal
函数,另一种是使用yaml.NewYAMLOrJSONDecoder
结合Decode方法。
-
yaml.Unmarshal
函数:- 可以直接将YAML文件的内容解析为指定的结构体对象。
- 适用于将整个YAML文件内容解析为单个结构体对象的情况。
- 一次性将整个YAML文件内容解析为结构体对象,不支持逐行或逐段解析。
-
yaml.NewYAMLOrJSONDecoder
结合Decode方法:- 可以逐行或逐段地解析YAML文件内容。
- 可以更灵活地控制解析过程,适用于需要逐行或逐段处理YAML文件内容的情况。
- 可以通过多次调用Decode方法来逐步解析YAML文件内容,而不是一次性解析整个文件。
根据具体的需求和场景,你可以选择使用适合的方法来解析YAML文件。如果只需要一次性将整个文件内容解析为结构体对象,可以使用yaml.Unmarshal
函数;如果需要逐行或逐段地处理YAML文件内容,可以使用yaml.NewYAMLOrJSONDecoder
结合Decode方法。
运行代码,输出如下:
List
查询pvc,使用ListOptions
指定label,field,namespace selectors
来缩小pvc列表的范围,这个结果的返回类型是v1.PeristentVolumeClaimList
。并使用模板输出。
给nginx deployment添加app=nginx
的label。
kubectl patch deployment nginx -p '{"spec":{"template":{"metadata":{"labels":{"app":"nginx"}}}}}'
get.go
package pvc
import (
"context"
"flag"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"log"
)
func GetDeploy(c *kubernetes.Clientset, ns string) {
var labels string
flag.StringVar(&labels, "labels", "app=nginx", "Label Selector")
listOptions := metav1.ListOptions{
LabelSelector: labels,
}
deploy, err := c.AppsV1().Deployments(ns).List(context.TODO(), listOptions)
if err != nil {
log.Fatal(err)
}
for _, deploy := range deploy.Items {
fmt.Println(deploy.Name)
}
}
func GetPVC(c *kubernetes.Clientset, ns string) {
var label, field string
flag.StringVar(&ns, "namespace", "default", "Namespace")
flag.StringVar(&label, "label", "", "Label Selector")
flag.StringVar(&field, "field", "", "Field Selector")
listOptions := metav1.ListOptions{
LabelSelector: label,
FieldSelector: field,
}
pvc, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), listOptions)
if err != nil {
log.Fatal(err)
}
template := "%-16s%-8s%-8s\n"
fmt.Printf(template, "NAME", "STATUS", "CAPACITY")
for _, pvc := range pvc.Items {
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
fmt.Printf(template, pvc.Name, string(pvc.Status.Phase), quant.String())
}
}
getPod.go
获取pod详细信息。
im
Delete
delete.go
package pvc
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"log"
)
func DeleteDeploy(c *kubernetes.Clientset) {
deployName := "nginx"
err := c.AppsV1().Deployments("default").Delete(context.TODO(), deployName, metav1.DeleteOptions{})
if err != nil {
log.Fatal(err)
}
log.Printf("delete deployment %s success\n", deployName)
}
func DeletePVC(c *kubernetes.Clientset) {
pvcName := "nginx-pvc"
err := c.CoreV1().PersistentVolumeClaims("default").Delete(context.TODO(), pvcName, metav1.DeleteOptions{})
if err != nil {
log.Fatal(err)
}
log.Printf("delete pvc %s success\n", pvcName)
}
Update
修改deployment的副本数为2,并打印更新后的副本数。修改传递整数类型的值,通常会使用指针。
update.go
package pvc
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"log"
)
func Update(c *kubernetes.Clientset) {
deployment, err := c.AppsV1().Deployments("default").Get(context.TODO(), "nginx", metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
replicas := int32(2)
deployment.Spec.Replicas = &replicas
updatedDeployment, err := c.AppsV1().Deployments("default").Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
log.Fatal(err)
}
log.Println("update deployment success")
fmt.Printf("Nginx Deployment replicas updated to: %d\n", *updatedDeployment.Spec.Replicas)
}
Watch
创建一个监听器watcher,监听事件类型,处理pvc的添加,删除,更新事件,设置总的最大quota为20G,在触发对应事件后,代码做对应处理。使用watcher.ResultChan
实现通道,在添加删除时加锁保证安全,pvc的结构体使用sync.map实现。
watch.go
package pvc
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"log"
"sync"
)
var (
totalQuota resource.Quantity
maxQuota = resource.MustParse("20Gi")
lock sync.RWMutex
pvcMap sync.Map
)
func Watch(c *kubernetes.Clientset) {
watcher, err := c.CoreV1().PersistentVolumeClaims("default").Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
log.Fatal("unexpected type")
}
switch event.Type {
case watch.Added:
addHandler(pvc)
case watch.Modified:
modifyHandler(pvc)
case watch.Deleted:
deleteHandler(pvc)
}
}
}
func addHandler(pvc *v1.PersistentVolumeClaim) {
lock.Lock()
defer lock.Unlock()
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
totalQuota.Add(quant)
if totalQuota.Cmp(maxQuota) == 1 {
log.Printf("Cannot add PVC: %s, would exceed quota %s\n", pvc.Name, maxQuota.String())
} else {
log.Printf("PVC %s added, totalQuota: %s\n", pvc.Name, totalQuota.String())
}
pvcMap.Store(pvc.Name, pvc)
}
func modifyHandler(pvc *v1.PersistentVolumeClaim) {
fmt.Printf("PVC %s modified\n", pvc.Name)
}
func deleteHandler(pvc *v1.PersistentVolumeClaim) {
lock.Lock()
defer lock.Unlock()
if _, ok := pvcMap.Load(pvc.Name); ok {
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
totalQuota.Sub(quant)
pvcMap.Delete(pvc.Name)
log.Printf("PVC %s deleted, totalQuota: %s\n", pvc.Name, totalQuota.String())
}
}
代码运行结果如下:
为什么使用sync.Map
请参考这里
Cmp
CmpInt64 returns 0 if the quantity is equal to y, -1 if the quantity is less than y, or 1 if the quantity is greater than y.
x.Cmp(y),如果x < y,返回-1,如果x = y,返回0,如果x > y,返回1。
Informer
下面的代码是一个使用Informer实现的PVC存储配额管理示例。
package pvc
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"log"
"sync"
"time"
)
var (
// 存储PVC的总存储量,key为"total",value为int64类型。
PVCMap sync.Map
rwLock sync.RWMutex
)
const maxQuotaGB = 20 // 20Gi
func CreateInformer(c *kubernetes.Clientset) {
// 在启动informer之前计算已有PVCs的总存储量
getPVCStorageUsage(c)
// 创建SharedInformerFactory
factory := informers.NewSharedInformerFactoryWithOptions(c, 0, informers.WithNamespace("default"))
pvcInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
informerStartTime := metav1.NewTime(time.Now())
// 设置事件处理器
_, err := pvcInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pvc := obj.(*v1.PersistentVolumeClaim)
// 去掉创建时间早于informer启动时间的PVC,即只有informer启动后的ADD事件才触发
if pvc.CreationTimestamp.Before(&informerStartTime) {
return
}
StorageUsage(pvc, true)
//fmt.Println("PVC added")
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPvc := oldObj.(*v1.PersistentVolumeClaim)
newPvc := newObj.(*v1.PersistentVolumeClaim)
updatetorageUsage(oldPvc, newPvc)
},
DeleteFunc: func(obj interface{}) {
pvc := obj.(*v1.PersistentVolumeClaim)
StorageUsage(pvc, false)
//fmt.Println("PVC deleted")
},
},
)
if err != nil {
log.Fatal(err)
}
// 启动informer
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
//factory.WaitForCacheSync(stopCh)
// 等待Informer同步。这个函数会阻塞,直到所有的Informer都同步完成,或者stopCh被关闭。
if !cache.WaitForCacheSync(stopCh, pvcInformer.HasSynced) {
log.Fatal("Timed out waiting for caches to sync")
}
// 保持主goroutine运行
select {}
}
// getPVCStorageUsage 用于计算已有PVC的存储量,并将其存储在PVCMap中。
func getPVCStorageUsage(c *kubernetes.Clientset) {
pvcs, err := c.CoreV1().PersistentVolumeClaims("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
var total int64
for _, pvc := range pvcs.Items {
quantity := pvc.Spec.Resources.Requests[v1.ResourceStorage]
quantityGB, _ := quantity.AsInt64()
total += quantityGB
}
PVCMap.Store("total", total)
fmt.Printf("Initial total storage usage: %d GiB\n", total/1024/1024/1024)
}
// StorageUsage 用于更新PVC的存储使用情况。根据add参数判断是添加PVC还是删除PVC,然后根据PVC请求的存储大小来更新当前总使用量,并将其存储到PVCMap中。
func StorageUsage(pvc *v1.PersistentVolumeClaim, add bool) {
rwLock.Lock()
defer rwLock.Unlock()
// 获取PVC请求的存储大小
quota := pvc.Spec.Resources.Requests[v1.ResourceStorage]
quotaGB, _ := quota.AsInt64()
//fmt.Printf("quotaGB: %d GiB\n", quotaGB/1024/1024/1024)
// 从sync.Map获取当前总使用量
total, _ := PVCMap.LoadOrStore("total", 0)
currentTotal := total.(int64)
//fmt.Printf("current total storage usage: %d GiB\n", currentTotal/1024/1024/1024)
if add {
// 如果超出最大容量,打印告警日志
if (currentTotal+quotaGB)/1024/1024/1024 > maxQuotaGB {
currentTotal += quotaGB
PVCMap.Store("total", currentTotal)
fmt.Printf("PVC %s added, would exceed quota %d GiB, current total storage usage: %d GiB\n", pvc.Name, maxQuotaGB, currentTotal/1024/1024/1024)
} else {
currentTotal += quotaGB
PVCMap.Store("total", currentTotal)
fmt.Printf("PVC %s added, current total storage usage: %d GiB\n", pvc.Name, currentTotal/1024/1024/1024)
}
} else {
currentTotal -= quotaGB
PVCMap.Store("total", currentTotal)
fmt.Printf("PVC %s deleted, current total storage usage: %d GiB\n", pvc.Name, currentTotal/1024/1024/1024)
}
fmt.Printf("current total storage usage: %d GiB\n", currentTotal/1024/1024/1024)
}
// updatetorageUsage 函数用于在PVC更新时更新存储使用情况。它会比较PVC的旧存储大小和新存储大小,如果不相等,则通过读写锁更新当前总使用量,并将其存储到PVCMap中。
func updatetorageUsage(oldPvc, newPvc *v1.PersistentVolumeClaim) {
oldQuantity := oldPvc.Spec.Resources.Requests[v1.ResourceStorage]
newQuantity := newPvc.Spec.Resources.Requests[v1.ResourceStorage]
oldSizeInGiB, _ := oldQuantity.AsInt64()
newSizeInGiB, _ := newQuantity.AsInt64()
if oldSizeInGiB != newSizeInGiB {
rwLock.Lock()
defer rwLock.Unlock()
total, _ := PVCMap.Load("total")
totalGB := total.(int64)
totalGB = totalGB - oldSizeInGiB + newSizeInGiB
PVCMap.Store("total", totalGB)
fmt.Printf("Updated total storage usage after PVC update: %d GiB\n", totalGB/1024/1024/1024)
}
}
坑
1.informer启动前已有的pvc会触发ADD事件
这个通过添加informer启动时间,判断pvc创建时间是否大于该时间解决。if pvc.CreationTimestamp.Before(&informerStartTime)
2.创建pvc后要控制台没有打印added日志
我在多次测试启动informer后,每次创建pvc时,都要等10秒左右,控制台才能打印出added日志,最开始我以为没有同步完成,我添加了cache.WaitForCacheSync
,检测是否同步完成。但是添加之后问题依旧。上面已经判断了pvc的创建时间,pvc也已经创建成功了,并且是在default namespace下。最后对比本地电脑的时间和服务器时间,发现相差了10秒。问题解决....
Exec
execOne.go
进入单个pod执行命令。
package pvc
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"log"
"os"
)
func ExceOne(c *kubernetes.Clientset) {
cmd := []string{"ls", "-l", "/etc/nginx/nginx.conf"}
request := c.CoreV1().RESTClient().Post().
Resource("pods").
Namespace("default").
Name("nginx-6945b88ffd-wb2hz").
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: "nginx",
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
var kubeConfig = "E:\\client-go\\config"
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
log.Fatal(err)
}
exec, err := remotecommand.NewSPDYExecutor(config, "POST", request.URL())
if err != nil {
log.Fatal(err)
}
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
//Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
})
if err != nil {
log.Fatal(err)
}
}
涉及到的几个字段:
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request
# VersionedParams 将获取提供的对象,使用隐式 RESTClient API 版本和默认参数编解码器将其序列化为 map[string][]string,然后将它们作为参数添加到请求中。使用它来提供客户端库中的版本化查询参数。
var ParameterCodec runtime.ParameterCodec = runtime.NewParameterCodec(Scheme)
# scheme 是用来处理版本化的资源和API操作的。当你需要与apiserver交互,尤其是需要指定资源版本或执行特定版本的操作时,scheme 提供了必要的编解码器。scheme.ParameterCodec 用于编码 v1.PodExecOptions 到请求参数中。
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error)
# NewSPDYExecutor 是用来创建一个执行器(Executor),该执行器能够通过 SPDY 协议与apiserver通信。SPDY 是一种由Google开发的网络传输协议,它被设计来传输Web内容,并优化性能。在Kubernetes中,SPDY 协议被用于支持如 exec、attach、port-forward 等操作的实时流。
func (Executor) StreamWithContext(ctx context.Context, options StreamOptions) error
# StreamWithContext 方法用于在给定的上下文中启动流式传输。这个方法允许你指定输入/输出流,以便可以实时地发送命令到Pod并接收输出。这对于执行需要实时交互的命令非常有用。
execMutli.go
进入多个pod执行命令。
package pvc
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"log"
"os"
"sync"
)
func ExceMulti(c *kubernetes.Clientset) {
var kubeConfig = "E:\\client-go\\config"
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
log.Fatal(err)
}
pods, err := c.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
var wg sync.WaitGroup
for _, pod := range pods.Items {
wg.Add(1)
go func(pod v1.Pod) {
defer wg.Done()
fmt.Printf("Executing command on pod: %s\n", pod.Name)
cmd := []string{"hostname"}
req := c.CoreV1().RESTClient().Post().
Resource("pods").
Namespace("default").
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
//fmt.Printf("req: %s\n", req.URL())
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
log.Fatal(err)
}
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdout: os.Stdout,
Stderr: os.Stderr,
})
if err != nil {
log.Fatal(err)
}
}(pod)
}
wg.Wait()
}
其中request.URL()格式是:
https://ip:6443/api/v1/namespaces/default/pods/nginx-6945b88ffd-wb2hz/exec?command=echo&command=hello%2Cworld&stderr=true&stdout=true
main.go
package main
import (
"fmt"
client "pvc/client"
exe "pvc/exec"
)
func main() {
clientSet, err := client.GetClient()
if err != nil {
fmt.Printf("failed to get clientset: %v", err)
}
//dynamicClient, err := client.DynamicClient()
//if err != nil {
// fmt.Printf("failed to get dynamic client: %v", err)
//}
//fmt.Println("Create PersistentVolumeClaim:")
//deploy.CreatePVC(clientSet)
//fmt.Println("Create Deployment:")
//deploy.CreateDeploy(clientSet)
//var ns = "default"
//fmt.Println("List Deployments:")
//get.GetDeploy(client, ns)
//fmt.Println("List PersistentVolumeClaims:")
//get.GetPVC(clientSet, ns)
//get.GetPod(clientSet)
//fmt.Println("Delete Deployment:")
//delete.DeleteDeploy(clientSet)
//fmt.Println("Delete PersistentVolumeClaim:")
//delete.DeletePVC(clientSet)
//fmt.Println("Update Deployment:")
//update.Update(clientSet)
//watch.Watch(clientSet)
//watch.CreateInformer(clientSet)
//exe.ExceOne(clientSet)
exe.ExceMulti(clientSet)
}
常用api操作
pod
//声明pod对象
var pod *v1.Pod
//创建pod
pod, err := clientset.CoreV1().Pods(<namespace>).Create(<pod>)
//更新pod
pod, err := clientset.CoreV1().Pods(<namespace>).Update(<pod>)
//删除pod
err := clientset.CoreV1().Pods(<namespace>).Delete(<pod.Name>, &meta_v1.DeleteOptions{})
//查询pod
pod, err := clientset.CoreV1().Pods(<namespace>).Get(<pod.Name>, meta_v1.GetOptions{})
//列出pod
podList, err := clientset.CoreV1().Pods(<namespace>).List(&meta_v1.ListOptions{})
//watch pod
watchInterface, err := clientset.CoreV1().Pods(<namespace>).Watch(&meta_v1.ListOptions{})
svc
//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})
deployment
//声明deployment对象
var deployment *v1beta1.Deployment
//构造deployment对象
//创建deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Create(<deployment>)
//更新deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Update(<deployment>)
//删除deployment
err := clientset.AppsV1beta1().Deployments(<namespace>).Delete(<deployment.Name>, &meta_v1.DeleteOptions{})
//查询deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Get(<deployment.Name>, meta_v1.GetOptions{})
//列出deployment
deploymentList, err := clientset.AppsV1beta1().Deployments(<namespace>).List(&meta_v1.ListOptions{})
//watch deployment
watchInterface, err := clientset.AppsV1beta1().Deployments(<namespace>).Watch(&meta_v1.ListOptions{})
请问我在window上执行exec.StreamWithContext会有unable to upgrade connection: you must specify at least 1 of stdin, stdout, stderr报错是怎么回事
你没有指定输出吧