Kubernetes 系统使用 client-go 作为 Go 语言的官方编程式交互客户端库,提供对 Kubernetes API Server 服务的交互访问。client-go 的源码路径为 vendor/k8s.io/client-go。
client-go 源码结构
| 源码目录 | 说明 |
|---|---|
| discovery | 提供 DiscoveryClient 发现客户端 |
| dynamic | 提供 DynamicClient 动态客户端 |
| informers | 每种 Kubernetes 资源的 Informer 实现 |
| kubernetes | 提供 ClientSet 客户端 |
| listers | 为每一个 Kubernetes 资源提供 Lister 功能,该功能对 Get 和 List 请求提供只读的缓存数据 |
| plugin | 提供 OpenStack 、GCP 和 Azure 等云服务商授权插件 |
| rest | 提供 RESTClient 客户端,对 Kubernetes API Server 执行 RESTful 操作 |
| scale | 提供 ScaleClient 客户端,用于扩容或缩容 Deployment,ReplicaSet,Replication Controller 等资源对象 |
| tools | 提供常用工具,例如 SharedInformer,Reflector,DealtFIFO 及 Indexers。提供 Client 查询和缓存机制,以减少向 kube-apiserver 发起的请求数等 |
| transport | 提供安全的 TCP 连接,支持 Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如 exec,attach 等操作,该功能由内部的 spdy 包提供支持 |
| util | 提供常用方法,例如 WorkQueue 工作队列,Certificate 证书管理等 |
Client 客户端对象
RESTClient 是最基础的客户端,RESTClient 对 HTTP Request 进行了封装,实现了 RESTful 风格的 API。ClientSet,DynamicClient 及 DiscoveryClient 客户端都是基于 RESTClient 实现的。
ClientSet 在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法,每一个 Resource 可以理解为一个客户端,而 ClientSet 则是多个客户端的集合,每一个 Resource 和 Version 都以函数的方式暴露给开发者。ClientSet 只能够处理 Kubernetes 内置资源,它是通过 client-gen 代码生成器自动生成的。
DynamicClient 与 ClientSet 最大的不同之处是,ClientSet 仅能访问 Kubernetes 自带的资源,不能直接访问 CRD 自定义资源,DynamicClient 能够处理 Kubernetes 中的所有资源对象,包括 Kubernetes 内置资源与 CRD 自定义资源。
DiscoveryClient 发现客户端,用于发现 kube-apiserver 所支持的资源组、资源版本、资源信息。
以上 4 中客户端都可以通过 kubeconfig 配置信息连接到指定的 Kubernetes API Server。
kubeconfig 配置管理
kubeconfig 用于管理访问 kube-apiserver 的配置信息,同时也支持访问多 kube-apiserver 的配置管理,可以再不同的环境下管理不同的 kube-apiserver 集群配置,不同业务线也可以拥有不同的集群,kubernetes 的其他组件都使用 kubeconfig 配置信息来连接 kube-apiserver 组件的。
Kubeconfig 配置信息如下:
$ cat ~/.kube/config
apiVersion: v1
kind: Config
preferences: {}
clusters:
- cluster:
name: dev-cluster
users:
- name: dev-user
contexts:
- context
name: dev-context
- clusters: 定义 Kubernetes 集群信息,例如 kube-apiserver 的服务地址及集群的证书信息等。
- users: 定义 Kubernetes 集群用户身份验证的客户端凭据,例如 client-certificate、client-key、token及username/password 等。
- contests: 定义 Kubernetes 集群用户信息和命名空间等,用于将请求发送到指定的集群。
client-go 会读取 kubeconfig 配置信息并生成 config 对象,用于与 kube-apiserver 通信,代码示例如下:
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")
if err != nil {
panic(err)
}
...
}
上述代码中,clientcmd.BuildConfigFromFlags 函数会读取 kubeconfig 配置信息并实例化 rest.Config 对象,其中 kubeconfig 最核心的功能是管理多个访问 kube-apiserver 集群的配置信息,将多个配置信息合并成一份,在合并过程中会解决多个配置文件字段冲突的问题,该过程由 Load 函数完成,可以分为两步:第一步,加载 kubeconfig 配置信息;第二步,合并多个 kubeconfig 配置信息。代码示例如下:
- 加载 kubeconfig 配置信息
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
...
kubeConfigFiles := []string{}
...
if len(rules.ExplicitPath) > 0 {
...
kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
} else {
kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
}
for _, filename := range kubeConfigFiles {
...
config, err := LoadFromFile(filename)
...
kubeconfigs = append(kubeconfigs, config)
}
...
}
从上可知,有以上两种方式可以获取 kubeconfig 配置信息路径:第一种,文件路径(即 rules.ExplicitPath);第二种,环境变量(通过 KUBECONFIG 变量,即 rules.Precedence,可指定多个路径)。最后将配置信息汇总到 kubeConfigFiles 中,这两种方式都通过 LoadFromFile 函数读取数据并把读取到的数据反序列化到 Config 对象中。代码示例如下:
func Load(data []byte) (*clientcmdapi.Config, error) {
config := clientcmdapi.NewConfig()
...
decoded, _, err := clientcmdlatest.Codec.Decode(data, &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, config)
...
return decoded.(*clientcmdapi.Config), nil
}
- 合并多个 kubeconfig 配置信息
代码示例如下:
config := clientcmdapi.NewConfig()
mergo.MergeWithOverwrite(config, mapConfig)
mergo.MergeWithOverwrite(config, nonMapConfig)
mergo.MergeWithOverwrite 函数将 src 字段填充到 dst 结构中,私有字段除外,非空的 dst 字段将被覆盖,另外 dst 和 src 必须拥有有效的相同类型结构。
RESTClient 客户端
它具有很高的灵活性,数据不依赖于方法和资源,因此 RESTClient 能够处理多种类型的调用,返回不同的数据格式。
RESTClient Example 代码示例如下:
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
// 通过 kubeconfig 配置信息实例化 RESTClient 对象。
if err != nil {
panic(err)
}
result := &corev1.PodList{}
err = restClient.Get().Namespace("default").Resource("pods").VersionedParams(&metav1.ListOptions{Limit:500}, scheme.ParameterCodec).Do().Into(result)
// RESTClient 对象构建 HTTP 请求参数。例如 GET, POST, PUT, DELETE, PATCH 等
// VersionedParams 函数将一些查询选项添加到请求参数中
if err != nil {
panic(err)
}
for _, d := range result.Items {
fmt.Printf("NAMESPACE:%v \t NAME: %v \t STATU: %+v \n", d.Namespace, d.Name, d.Status.Phase)
}
}
以上代码列出 defult 命名空间下的所有 Pod 资源对象的相关信息。首先加载 kubeconfig 配置信息,并设置 config.APIPath 请求的 HTTP 路径,然后设置 config.GroupVersion 请求的资源组/资源版本。最后设置 config.NegotiatedSerializer 数据的编码器。
RESTClient 发送请求的过程对 Go 语言标准库 net/http 进行了封装,由 Do -> request 函数实现,代码示例如下:
func (r *Request) Do() Result {
...
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
}
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
...
for {
url := r.URL().String()
// 生成请求的 RESTful URL
req, err := http.NewRequest(r.verb, url, r.body)
// 通过 Go 语言标准库 net/http 向 RESTful URL(即 kube-apiserver)发送请求。
if err != nil {
return err
}
...
req.Header = r.headers
...
resp, err := client.Do(req)
...
if err != nil {
if !net.IsConnectionReset(err) || r.verb != "GET" {
return err
}
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: iotil.NopCloser(bytes.NewReader([]byte{})),
// 请求得到的结果存放在 http.Respose 的 Body 中
}
}
...
resp.Body.Close()
// 函数退出时,会通过此命令进行关闭,防止内存溢出
...
fn(req, resp)
// 将结果转换为资源对象
...
}
}
ClientSet 客户端
RESTClient 是一种最基础的客户端,使用时需要制定 Resource 和 Version 等信息,相比于 RESTClient,ClientSet 使用起来更加便捷。
- ClientSet 在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法,每一个 Resource 可以理解为一个客户端,而 ClientSet 则是多个客户端的集合,每一个 Resource 和 Version 都以函数的方法暴露给开发者。
ClientSet 仅能访问 Kubernetes 自身内置的资源,不能直接访问 CRD 自定义资源,如果需要访问 CRD 自定义资源,可以通过 client-gen 代码生成器重新生成 ClientSet,在 ClientSet 集合中自动生成与 CRD 操作相关的接口。
ClientSet Example 代码示例如下:
func main() {
config, err != clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
podClient := clientset.CoreV1().Pods(apiv1.NamespaceDefault)
list, err := podClient.List(metav1.ListOptions{Limit:500})
if err != nil {
panic(err)
}
for _, d := range list.Items {
fmt.Printf("NAMESPACE:%v \t NAME: %v \t STATU: %+v \n", d.Namespace, d.Name, d.Status.Phase)
}
}
以上代码列出 defult 命名空间下的所有 Pod 资源对象的相关信息。首先加载 kubeconfig 配置信息,kubernetes.NewForConfig 通过 kubeconfig 配置信息实例化 clientset 对象,该对象用于管理所有 Resource 的客户端。
clientset.CoreV1().Pods 函数表示请求 core 核心资源组的 v1 资源版本下的 Pod 资源对象,其内部设置了 APIPath 请求的 HTTP 路径,GroupVersion 请求的资源组、资源版本,NegotiatedSerializer 数据的编码器。
其中 Pods 函数是一个资源接口对象,用于 Pod 资源对象的管理,例如对 Pod 资源执行 Create,Update,Delete,Get,List,Watch,Patch等操作,这些操作实际上是对 RESTClient 进行了封装,可以设置选项(如 Limit,TimeoutSeconds等)。
podClient.List 函数通过 RESTClient 获得 Pod 列表,代码如下:
func (c *pods) List (opts metav1.ListOptions) (result *v1.PodList, err error) {
...
result = &v1.PodList{}
err = c.client.Get().Namespace(c.ns).Resource("pods").VersionedParams(&opts, scheme.ParameterCodec).Timeout(timeout).Do().Into(result)
return
}
DynamicClient 客户端
它可以对任意 Kubernetes 资源进行 RESTful 操作,包括 CRD 自定义资源,与 ClientSet 操作类似,同样封装了 RESTClient,同样提供了Create,Update,Delete,Get,List,Watch,Patch等方法。
ClientSet 需要预先实现每种 Resource 和 Version 的操作,其内部的数据都是结构化数据(即已知数据结构)。而 DynamicClient 内部实现了 Unstructured,用于处理非结构化数据结构(即无法提前预知数据结构),这也是其能处理 CRD 自定义资源的关键。
DynamincClient 不是类型安全的,因此在访问 CRD 自定义资源时需要特别注意,例如,在操作指针不当的情况下可能会导致程序崩溃。
DynamicClient 的处理过程将 Resource(例如PodList)转换成 Unstructured 结构类型,Kubernetes 的所有 Resource 都可以转换为该结构类型。处理完成后,再将 Unstructured 转换为 PodList。
整个过程类似于 Go 语言的 interface{} 断言转换过程。另外,Unstructured 结构类型是通过 map[string]interface{} 转换的。
类似 kubectl 命令,DynamincClient Example 代码示例如下:
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
gvr := schema.GroupVersionResource{Version:"v1", Resource:"pods"}
unstructObj, err := dynamicClient.Resource(gvr).Namespace(apiv1.NamespaceDefault).List(metav1.ListOptions{Limit: 500})
// dynamicClient.Resource(gvr) 函数用于设置请求的资源组、资源版本、资源名称。
// Namespace 函数用于设置请求的命名空间
// List 函数用于获取 Pod 列表(Pod列表为 unstructured.UnstructuredList 指针类型)
if err != nil {
panic(err)
}
podList := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUstructured(unstructObj.UnstructuredContent(), podList)
// runtime.DefaultUnstructuredConverter.FromUstructured 函数将 unstructured.UnstructuredList 转换成 PodList 类型。
if err != nil {
panic(err)
}
for _, d := range podList.Items {
fmt.Printf("NAMESPACE:%v \t NAME: %v \t STATU: %+v \n", d.Namespace, d.Name, d.Status.Phase)
}
}
以上代码列出 defult 命名空间下的所有 Pod 资源对象的相关信息。首先加载 kubeconfig 配置信息,dynamic.NewForConfig 通过 kubeconfig 配置信息实例化 dynamicClient 对象,该对象用于管理 Kubernetes 的所有 Resource 的客户端,例如对 Resource 执行Create,Update,Delete,Get,List,Watch,Patch等操作。
DiscoveryClient 客户端
它主要用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息。
kubectl 的 api-versions 和 api-resources 命令输出也是通过 DiscoveryClient 实现的。
DiscoveryClient 同样在 RESTClient 的基础上进行了封装。
DiscoveryClient 除了可以发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息,还可以将这些信息存储到本地,用于本地缓存(Cache),以减轻对 Kubernetes API Server 访问的压力,在运行 Kuvernetes 组件的机器上,缓存信息默认存储在 ~/.kube/cache 和 ~/.kube/http-cache 下。
类似 kubectl 命令,通过 DiscoveryClient 列出 Kubernetes API Server 所支持的资源组、资源版本、资源信息,代码示例如下:
func main() {
config, err := client.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
// 该函数会返回 Kubernetes API Server 所支持的资源组、资源版本、资源信息,通过遍历 APIResourceList 输出信息。
if err != nil {
panic(err)
}
for _, list := range APIResourceList {
gv, err := schema.ParaseGroupVersion(list.GroupVersion)
if err != nil {
panic(err)
}
for _, resource := range list.APIResources {
fmt.Printf("name: %v, group: %v, version: %v\n", resource.Name, gv.Group, gv.Version)
}
}
}
运行以上代码,列出 Kubernetes API Server 所支持的资源组、资源版本、资源信息。首先加载 kubeconfig 配置信息,discovery.NewDiscoveryClientForConfig 通过 kubeconfig 配置信息实例化 discoveryClient 对象,该对象是用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息的客户端。
- 获取 Kubernetes API Server 所支持的资源组、资源版本、资源信息
Kubernetes API Server 暴露出 /api 和 /apis 接口。DiscoveryClient 通过 RESTClient 分别请求 /api 和 /apis 接口,从而获取 Kubernetes API Server 所支持的资源组、资源版本、资源信息。其核心实现位于 ServerGroupsAndResource -> ServerGroups 中,代码示例如下:
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
v := &metav1.APIVersion{}
err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do().Into(v)
// 通过 RESTClient 请求 /api 接口,将请求结果存放于 metav1.APIVersion 结构体中。
...
apiGroupList = &metav1.APIGroupList{}
err = d.restClient.Get().AbsPath("/apis").Do().Into(apiGroupList)
// 通过 RESTClient 请求 /apis 接口,将请求结果存放于 metav1.APIGroupList 结构体中。
...
apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
// 最后将 /api 接口中检索到的资源组信息合并到 apiGroupList 列表中并返回。
...
}
- 本地缓存的 DicoveryClient
缓存可以减轻 client-go 对 Kubernetes API Server 的访问压力。默认每 10 分钟与 Kubernetes API Server 同步一次。
DiscoveryClient 第一次获取资源组、资源版本、资源信息时,首先会查询本地缓存,如果数据不存在则请求 Kubernetes API Server 接口,Cache 将 Kubernetes API Server 响应的数据存储在本地一份并返回给 DiscoveryClient。当下一次 DiscoveryClient 再次获取资源信息时,会将数据直接从本地缓存返回给 DiscoveryClient。代码示例如下:
func (d *CacheDiscoveryClient) ServerResourceForGroupVersion (groupVersion string) (*metav1.APIResourceList, error) {
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
cachedBytes, err := d.getCachedFile(filename)
if err == nil {
cachedResources := &metav1.APIResourceList{}
...
return cachedResources, nil
}
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
...
if err := d.writeCachedFile(filename, liveResources); err != nil {
...
}
return liveResources, nil
}