面向API server CRUD的探索

前言

某些场景下,我们可能不需要operator的那种运维能力,能对资源CRUD就够了,client-go中的client就是非常不错的选择。

前置知识

GVR、GVK

在整个kubernetes架构中,资源是最重要的概念,可以说k8s的生态系统都围绕资源运作,它本质上是一个资源控制系统:注册、管理、调度资源并维护资源状态。

k8s将资源分组和版本化,形成了:

  • Group: 资源组
  • Version: 资源版本
  • Resource: 资源
  • Kind: 资源种类, 与resource为同级概念

k8s系统支持多个Group,每个Group支持多个Version,每个Version支持多个Resource,其中部分资源会拥有子资源,如Deployment会拥有Status子资源。

资源组、资源版本、资源、子资源的完整表现形式为:<group>/<version>/<resource>/<subresource>

如Deployment资源,完整表现形式为:apps/v1/deployments/status

资源实例化后为一个资源对象,拥有资源组、资源版本、资源种类,表现形式为:<group>/<version>,Kind=<kind>

如Deployment, 完整表现形式为apps/v1,Kind=Deployment

client-go

k8s使用client-go作为go语言官方编程式交互客户端库,提供对k8s API server的交互式访问。

client-go支持4种client对象与k8s交互,分别是ClientSet、DynamicClient、DiscoveryClient、RESTClient

RESTClient最基础,它封装了HTTPRequest,实现了RESTful风格的API,ClientSet、DynamicClinet以及DiscoveryClient都是基于RESTfulClient实现的。

image

RESTClient

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
	if err != nil {
		panic(err)
	}

	config.APIPath = "api"
	// group core version v1
	config.GroupVersion = &corev1.SchemeGroupVersion
	// set codec
	config.NegotiatedSerializer = scheme.Codecs
	
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}

	result := &corev1.PodList{}

	ctx := context.Background()

	err = restClient.Get().
		Namespace("default").
		Resource("pods").
		VersionedParams(&metav1.ListOptions{Limit: 10}, scheme.ParameterCodec).
		Do(ctx).
		Into(result)

	if err != nil {
		panic(err)
	}

	for _, pod := range result.Items {
		fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
	}
}

clientset

ClientSet在RESTClient基础上封装了Resource管理方法, 每个Resource可以理解为一个资源crud的客户端,而ClientSet是多个客户端集合,在使用前需要知道要操作资源的Group, Version。需要注意,clientset仅能访问k8s内置资源,无法访问自定义资源CRD,如需访问CRD,需要使用代码生成器重新生成clientset。

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
	if err != nil {
		panic(err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	// all namepsaces pods
	podClient := clientset.CoreV1().Pods("")
	list, err := podClient.List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err)
	}

	for _, pod := range list.Items {
		fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
	}
}

dynamic client

dynamic client是一种动态客户端,和clientset最大的区别是:clientset不能操作自定义资源,但dynamic client可以,这是因为dynamic client内部使用非结构化数据unstructured,该数据结构能与resource互转。

同时,dynamic client有对应的dynamic informer实现,因此在查询时可以从本地内存缓存获取资源信息。

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
	if err != nil {
		panic(err)
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
	// all namespaces
	unstructuredObjs, err := dynamicClient.Resource(gvr).Namespace("").List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err)
	}

	podList := &corev1.PodList{}
	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObjs.UnstructuredContent(), podList); err != nil {
		panic(err)
	}
	
	for _, pod := range podList.Items {
		fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
	}
}

discovery client

discovery client是一个带缓存的资源发现客户端,主要用于发现API server支持的GVR,缓存信息默认存储于~/.kube/cache~/.kube/http-cache中

// list resource

package main

import (
	"fmt"

	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
	if err != nil {
		panic(err)
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err)
	}

	_, APIResourceLIst, err := discovery.ServerGroupsAndResources(discoveryClient)
	if err != nil {
		panic(err)
	}

	for _, list := range APIResourceLIst {
		gv, err := schema.ParseGroupVersion(list.GroupVersion)
		if err != nil {
			panic(err)
		}

		for _, resource := range list.APIResources {
			fmt.Println(resource.Name, gv.Group, gv.Version)
		}
	}
}

best practice

通常, 我们在设计自己的程序时,既想用discovery client的资源发现能力,也想拥有dynamic client对内置资源和自定义资源一致的管理能力,使用RESTmapper可以将两者结合起来:

  • restmapper借助discovery client,实现GVK映射GVR,若资源不存在映射失败
  • dynamic client使用获取的GVR向API server查询
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/discovery/cached/memory"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/restmapper"
	"k8s.io/client-go/tools/clientcmd"
)

func init() {
	mustInitClient()
}

var (
	discoveryClient *discovery.DiscoveryClient
	dynamicClient   dynamic.Interface
	once            sync.Once

	gvk = &schema.GroupVersionKind{
		Version: "v1",
		Kind:    "Pod",
	}
	namespace = "default"
)

func main() {
	ctx := context.Background()
	resource, err := getDynamicResource(gvk, namespace)
	if err != nil {
		log.Fatal(err)
	}

	create := func() {
		fmt.Println("creating pod......")
		pod := corev1.Pod{
			TypeMeta: metav1.TypeMeta{},
			ObjectMeta: metav1.ObjectMeta{
				Name: "test-pod",
			},
			Spec: corev1.PodSpec{
				Containers: []corev1.Container{
					{
						Name:  "test-busybox",
						Image: "busybox",
					},
				},
			},
		}
		obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
		if err != nil {
			log.Fatal(err)
		}
		var unstructured unstructured.Unstructured
		unstructured.Object = obj
		if _, err := resource.Create(ctx, &unstructured, metav1.CreateOptions{}); err != nil {
			log.Fatal(err)
		}
	}
	create()

	get := func() {
		fmt.Println("getting pod......")
		unstructured, err := resource.Get(ctx, "test-pod", metav1.GetOptions{})
		if err != nil {
			log.Fatal(err)
		}
		var pod corev1.Pod
		err = runtime.DefaultUnstructuredConverter.
			FromUnstructured(unstructured.Object, &pod)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(pod.Name)
	}
	get()

	list := func() {
		fmt.Println("listing pods......")
		unstructuredList, err := resource.List(ctx, metav1.ListOptions{})
		if err != nil {
			log.Fatal(err)
		}
		var podList []*corev1.Pod
		for _, unstructured := range unstructuredList.Items {
			var pod corev1.Pod
			err := runtime.DefaultUnstructuredConverter.
				FromUnstructured(unstructured.Object, &pod)
			if err != nil {
				log.Fatal(err)
			}
			podList = append(podList, &pod)
		}
		for _, pod := range podList {
			fmt.Println(pod.Namespace, pod.Name)
		}
	}
	list()

	delete := func(){
		fmt.Println("deleting pod...")
		if err := resource.Delete(ctx, "test-pod", metav1.DeleteOptions{}); err != nil {
			panic(err)
		}
	}
	delete()
}

func mustInitClient() {
	once.Do(func() {

		var configPath string

		if value := os.Getenv("KUBECONFIG"); value != "" {
			configPath = value
		} else {
			configPath = "~/.kube/config"
		}

		config, err := clientcmd.BuildConfigFromFlags("", configPath)
		if err != nil {
			log.Fatal(err)
		}

		discoveryClient, err = discovery.NewDiscoveryClientForConfig(config)
		if err != nil {
			log.Fatal(err)
		}

		dynamicClient, err = dynamic.NewForConfig(config)
		if err != nil {
			log.Fatal(err)
		}
	})
}

// getDynamicResource convert GVK into dynamic resource
func getDynamicResource(gvk *schema.GroupVersionKind, namespace string) (dr dynamic.ResourceInterface, err error) {

	mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient))
	mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, fmt.Errorf("CRD has not been registed, err: %s", err)
	}

	if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
		dr = dynamicClient.Resource(mapping.Resource).Namespace(namespace)
	} else {
		dr = dynamicClient.Resource(mapping.Resource)
	}

	return
}

参考

  • 《kubernetes源码剖析》