面向API server CRUD的探索
前言
某些场景下,我们可能不需要operator的那种运维能力,能对资源CRUD就够了,client-go中的client就是非常不错的选择。
前置知识
GVR、GVK
在整个kubernetes架构中,资源是最重要的概念,可以说k8s的生态系统都围绕资源运作,它本质上是一个资源控制系统:注册、管理、调度资源并维护资源状态。
k8s将资源分组和版本化,形成了:
Group
: 资源组Version
: 资源版本Resource
: 资源Kind
: 资源种类, 与resource为同级概念
k8s系统支持多个Group,每个Group支持多个Version,每个Version支持多个Resource,其中部分资源会拥有子资源,如Deployment会拥有Status子资源。
资源组、资源版本、资源、子资源的完整表现形式为:<group>/<version>/<resource>/<subresource>
如Deployment资源,完整表现形式为:apps/v1/deployments/status
资源实例化后为一个资源对象,拥有资源组、资源版本、资源种类,表现形式为:<group>/<version>,Kind=<kind>
如Deployment, 完整表现形式为apps/v1,Kind=Deployment
client-go
k8s使用client-go作为go语言官方编程式交互客户端库,提供对k8s API server的交互式访问。
client-go支持4种client对象与k8s交互,分别是ClientSet、DynamicClient、DiscoveryClient、RESTClient
RESTClient最基础,它封装了HTTPRequest,实现了RESTful风格的API,ClientSet、DynamicClinet以及DiscoveryClient都是基于RESTfulClient实现的。
RESTClient
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
if err != nil {
panic(err)
}
config.APIPath = "api"
// group core version v1
config.GroupVersion = &corev1.SchemeGroupVersion
// set codec
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
result := &corev1.PodList{}
ctx := context.Background()
err = restClient.Get().
Namespace("default").
Resource("pods").
VersionedParams(&metav1.ListOptions{Limit: 10}, scheme.ParameterCodec).
Do(ctx).
Into(result)
if err != nil {
panic(err)
}
for _, pod := range result.Items {
fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
}
}
clientset
ClientSet在RESTClient基础上封装了Resource管理方法, 每个Resource可以理解为一个资源crud的客户端,而ClientSet是多个客户端集合,在使用前需要知道要操作资源的Group, Version。需要注意,clientset仅能访问k8s内置资源,无法访问自定义资源CRD,如需访问CRD,需要使用代码生成器重新生成clientset。
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
ctx := context.Background()
// all namepsaces pods
podClient := clientset.CoreV1().Pods("")
list, err := podClient.List(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, pod := range list.Items {
fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
}
}
dynamic client
dynamic client是一种动态客户端,和clientset最大的区别是:clientset不能操作自定义资源,但dynamic client可以,这是因为dynamic client内部使用非结构化数据unstructured,该数据结构能与resource互转。
同时,dynamic client有对应的dynamic informer实现,因此在查询时可以从本地内存缓存获取资源信息。
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
if err != nil {
panic(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
ctx := context.Background()
gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
// all namespaces
unstructuredObjs, err := dynamicClient.Resource(gvr).Namespace("").List(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
podList := &corev1.PodList{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObjs.UnstructuredContent(), podList); err != nil {
panic(err)
}
for _, pod := range podList.Items {
fmt.Println(pod.Namespace, pod.Name, pod.Status.Phase)
}
}
discovery client
discovery client是一个带缓存的资源发现客户端,主要用于发现API server支持的GVR,缓存信息默认存储于~/.kube/cache
和~/.kube/http-cache中
。
// list resource
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/etc/rancher/k3s/k3s.yaml")
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, APIResourceLIst, err := discovery.ServerGroupsAndResources(discoveryClient)
if err != nil {
panic(err)
}
for _, list := range APIResourceLIst {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
panic(err)
}
for _, resource := range list.APIResources {
fmt.Println(resource.Name, gv.Group, gv.Version)
}
}
}
best practice
通常, 我们在设计自己的程序时,既想用discovery client的资源发现能力,也想拥有dynamic client对内置资源和自定义资源一致的管理能力,使用RESTmapper可以将两者结合起来:
- restmapper借助discovery client,实现GVK映射GVR,若资源不存在映射失败
- dynamic client使用获取的GVR向API server查询
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
)
func init() {
mustInitClient()
}
var (
discoveryClient *discovery.DiscoveryClient
dynamicClient dynamic.Interface
once sync.Once
gvk = &schema.GroupVersionKind{
Version: "v1",
Kind: "Pod",
}
namespace = "default"
)
func main() {
ctx := context.Background()
resource, err := getDynamicResource(gvk, namespace)
if err != nil {
log.Fatal(err)
}
create := func() {
fmt.Println("creating pod......")
pod := corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-busybox",
Image: "busybox",
},
},
},
}
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
if err != nil {
log.Fatal(err)
}
var unstructured unstructured.Unstructured
unstructured.Object = obj
if _, err := resource.Create(ctx, &unstructured, metav1.CreateOptions{}); err != nil {
log.Fatal(err)
}
}
create()
get := func() {
fmt.Println("getting pod......")
unstructured, err := resource.Get(ctx, "test-pod", metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
var pod corev1.Pod
err = runtime.DefaultUnstructuredConverter.
FromUnstructured(unstructured.Object, &pod)
if err != nil {
log.Fatal(err)
}
fmt.Println(pod.Name)
}
get()
list := func() {
fmt.Println("listing pods......")
unstructuredList, err := resource.List(ctx, metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
var podList []*corev1.Pod
for _, unstructured := range unstructuredList.Items {
var pod corev1.Pod
err := runtime.DefaultUnstructuredConverter.
FromUnstructured(unstructured.Object, &pod)
if err != nil {
log.Fatal(err)
}
podList = append(podList, &pod)
}
for _, pod := range podList {
fmt.Println(pod.Namespace, pod.Name)
}
}
list()
delete := func(){
fmt.Println("deleting pod...")
if err := resource.Delete(ctx, "test-pod", metav1.DeleteOptions{}); err != nil {
panic(err)
}
}
delete()
}
func mustInitClient() {
once.Do(func() {
var configPath string
if value := os.Getenv("KUBECONFIG"); value != "" {
configPath = value
} else {
configPath = "~/.kube/config"
}
config, err := clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
log.Fatal(err)
}
discoveryClient, err = discovery.NewDiscoveryClientForConfig(config)
if err != nil {
log.Fatal(err)
}
dynamicClient, err = dynamic.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
})
}
// getDynamicResource convert GVK into dynamic resource
func getDynamicResource(gvk *schema.GroupVersionKind, namespace string) (dr dynamic.ResourceInterface, err error) {
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient))
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("CRD has not been registed, err: %s", err)
}
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dr = dynamicClient.Resource(mapping.Resource).Namespace(namespace)
} else {
dr = dynamicClient.Resource(mapping.Resource)
}
return
}
参考
- 《kubernetes源码剖析》