快速入门 Kubernetes&CRD&Operator 篇五

/ K8sCRDOperator / 没有评论 / 3570浏览

202016202759-k8s

快速入门 Kubernetes&CRD&Operator

Kubernetes 是一个容器管理系统。 具体功能:

通过阅读Kubernetes指南Kubernetes HandBook以及官方文档 或者 阅读 Kubernetes权威指南可以获得更好的学习体验。

在开始安装 Kubernetes 之前,我们需要知道:

1、Docker与Kubernetes Docker 是一个容器运行时的实现,Kubernetes 依赖于某种容器运行时的实现。

2、Pod Kubernetes 中最基本的调度单位是 PodPod 从属于 Node(物理机或虚拟机),Pod 中可以运行多个 Docker 容器,会共享 PID、IPC、NetworkUTS namespacePod 在创建时会被分配一个 IP 地址,Pod 间的容器可以互相通信。

3、Yaml Kubernetes 中有着很多概念,它们都算做是一种对象,如 Pod、Deployment、Service 等,都可以通过一个 yaml 文件来进行描述,并可以对这些对象进行 CRUD 操作(对应 REST 中的各种 HTTP 方法)。

下面一个 Podyaml 文件示例:

apiVersion: v1
kind: Pod
metadata:
  name: my-nginx-app
  labels:
    app: my-nginx-app
spec:
  containers:
  - name: nginx
    image: nginx:1.7.9
    ports:
    - containerPort: 80

4、Node NodePod 真正运行的主机,可以是物理机,也可以是虚拟机。为了管理 Pod,每个 Node 节点上至少要运行 container runtime、kubeletkube-proxy 服务。

5、Deployment Deployment 用于管理一个无状态应用,对应一个 Pod 的集群,每个 Pod 的地位是对等的,对 Deployment 来说只是用于维护一定数量的 Pod,这些 Pod 有着相同的 Pod 模板。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-nginx-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-nginx-app
  template:
    metadata:
      labels:
        app: my-nginx-app
    spec:
      containers:
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80

可以对 Deployment 进行部署、升级、扩缩容等操作。

6、Service Service 用于将一组 Pod 暴露为一个服务。

kubernetes 中,PodIP 地址会随着 Pod 的重启而变化,并不建议直接拿 PodIP 来交互。那如何来访问这些 Pod 提供的服务呢?使用 ServiceService 为一组 Pod(通过 labels 来选择)提供一个统一的入口,并为它们提供负载均衡和自动服务发现。

apiVersion: v1
kind: Service
metadata:
  name: my-nginx-app
  labels:
    name: my-nginx-app
spec:
  type: NodePort      #这里代表是NodePort类型的
  ports:
  - port: 80          # 这里的端口和clusterIP(10.97.114.36)对应,即10.97.114.36:80,供内部访问。
    targetPort: 80    # 端口一定要和container暴露出来的端口对应
    protocol: TCP
    nodePort: 32143   # 每个Node会开启,此端口供外部调用。
  selector:
    app: my-nginx-app

7、Kubernetes组件

安装 Kubernetes【Minikube】

minikube 为开发或者测试在本地启动一个节点的 kubernetes 集群,minikube 打包了和配置一个 linux 虚拟机、dockerkubernetes 组件。

Kubernetes 集群是由 MasterNode 组成的,Master 用于进行集群管理,Node 用于运行 Podworkload。而minikube 是一个 Kubernetes 集群的最小集。

1、安装virtualbox virtualbox

2、安装minikube

curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/

3、启用dashboard(web console)【可选】

minikube addons enable dashboard

4、启动minikube

minikube start

start 之后可以通过 minikube status 来查看状态,如果 minikubecluster 都是 Running,则说明启动成功。

5、查看启动状态

kubectl get pods

kubectl体验【以一个Deployment为例】

kubectl 是一个命令行工具,用于向 API Server 发送指令。我们以部署、升级、扩缩容一个 Deployment、发布一个 Service 为例体验一下 Kubernetes。 命令的通常格式为:

kubectl $operation $object_type(单数or复数) $object_name other params

kubectl命令表:

Kubernetes kubectl create 命令详解

1、创建一个Deployment 可以使用 kubectl run 来运行,也可以基于现有的 yaml 文件来 create

kubectl run –image=nginx:1.7.9 nginx-app –port=80

或者

kubectl create -f my-nginx-deployment.yaml
# my-nginx-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-nginx-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-nginx-app
  template:
    metadata:
      labels:
        app: my-nginx-app
    spec:
      containers:
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80

然后可以通过 kubectl get pods来查看创建好了的 3Pod

$ kubectl get pods
NAME                            READY   STATUS              RESTARTS   AGE
my-nginx-app-6f647db65c-9w8kx   0/1     ContainerCreating   0          8s
my-nginx-app-6f647db65c-dmhrx   0/1     ContainerCreating   0          8s
my-nginx-app-6f647db65c-rbp9s   0/1     ContainerCreating   0          8s

再通过 kubectl get deployments 来查看创建好了的 deployment

$ kubectl get deployments
NAME           READY   UP-TO-DATE   AVAILABLE   AGE
my-nginx-app   3/3     3            3           51s

这里有4列,分别是:

2、删除掉任意一个Pod Deployment 自身拥有副本保持机制,会始终将其所管理的 Pod 数量保持为 spec 中定义的 replicas 数量。

# 打开一个新终端
$ kubectl get pods -w -l app=my-nginx-app
# 删除指定pod
$ kubectl delete pods $pod_name

可以看出被删掉的 Pod 的关闭与代替它的 Pod 的启动过程。

3、缩扩容 缩扩容有两种实现方式:

4、更新 更新也是有两种实现方式,Kubernetes 的升级可以实现无缝升级,即不需要进行停机。

#方法一
kubectl replace -f my-nginx-deployment.yaml
#方法二
kubectl set image $resource_type/$resource_name $container_name=nginx:1.9.1
# 查看
kubectl get pods -o yaml

5、暴露服务 暴露服务也有两种实现方式,一种是通过 kubectl create -f my-nginx-service.yaml 可以创建一个服务:

# my-nginx-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: my-nginx-app
  labels:
    name: my-nginx-app
spec:
  type: NodePort      #这里代表是NodePort类型的
  ports:
  - port: 80          # 这里的端口和clusterIP,供内部访问。
    targetPort: 80    # 端口一定要和 pod container 暴露出来的端口对应(上面配置的端口)
    protocol: TCP
    nodePort: 32143   # 每个 Node 会开启,此端口供外部调用。(每个Pod对外暴露的端口)
  selector:
    app: my-nginx-app

ports 中有三个端口,第一个 portPod 供内部访问暴露的端口,第二个 targetPortPodContainer 中配置的 另一种是通过 kubectl expose 命令实现。 minikube ip 返回的就是 minikube 所管理的 Kubernetes 集群所在的虚拟机 ip

minikube service my-nginx-app --url -p zzz-cluster
http://192.168.39.121:32143

CRD【CustomResourceDefinition】

CRDKubernetes 为提高可扩展性,让开发者去自定义资源(如 Deployment,StatefulSet 等)的一种方法。

Operator = CRD + Controller

CRD 仅仅是资源的定义,而 Controller 可以去监听 CRDCRUD 事件来添加自定义业务逻辑。

关于CRD有一些链接先贴出来: Extend the Kubernetes API with CustomResourceDefinitions

如果说只是对 CRD 实例进行 CRUD 的话,不需要 Controller 也是可以实现的,只是只有数据,没有针对数据的操作。

一个 CRDyaml 文件示例:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  # name must match the spec fields below, and be in the form: <plural>.<group>
  name: crontabs.stable.example.com
spec:
  # group name to use for REST API: /apis/<group>/<version>
  group: stable.example.com
  # list of versions supported by this CustomResourceDefinition
  version: v1beta1
  # either Namespaced or Cluster
  scope: Namespaced
  names:
    # plural name to be used in the URL: /apis/<group>/<version>/<plural>
    plural: crontabs
    # singular name to be used as an alias on the CLI and for display
    singular: crontab
    # kind is normally the CamelCased singular type. Your resource manifests use this.
    kind: CronTab
    # shortNames allow shorter string to match your resource on the CLI
    shortNames:
    - ct

通过 kubectl create -f crd.yaml 可以创建一个 CRD

Operator

我们平时在部署一个简单的 Webserver 到 Kubernetes 集群中的时候,都需要先编写一个 Deployment 的控制器,然后创建一个 Service 对象,通过 Pod 的 label 标签进行关联,最后通过 Ingress 或者 type=NodePort 类型的 Service 来暴露服务,每次都需要这样操作,是不是略显麻烦,我们就可以创建一个自定义的资源对象,通过我们的 CRD 来描述我们要部署的应用信息,比如镜像、服务端口、环境变量等等,然后创建我们的自定义类型的资源对象的时候,通过控制器去创建对应的 Deployment 和 Service,是不是就方便很多了,相当于我们用一个资源清单去描述了 Deployment 和 Service 要做的两件事情。

Operator-SDK

Operator 是由 CoreOS 开发的,用来扩展 Kubernetes API,特定的应用程序控制器,它用来创建、配置和管理复杂的有状态应用,如数据库、缓存和监控系统。Operator 基于 Kubernetes 的资源和控制器概念之上构建,但同时又包含了应用程序特定的领域知识。创建Operator 的关键是CRD(自定义资源)的设计。

Workflow

Operator SDK 提供以下工作流来开发一个新的 Operator

  1. 使用 SDK 创建一个新的 Operator 项目
  2. 通过添加自定义资源(CRD)定义新的资源 API
  3. 指定使用 SDK API 来 watch 的资源
  4. 定义 Operator 的协调(reconcile)逻辑
  5. 使用 Operator SDK 构建并生成 Operator 部署清单文件

创建新项目

$ operator-sdk new cassandra-operator --repo=github.com/zealzhangz/cassandra-operator
$ tree
.
├── build
│   ├── Dockerfile
│   └── bin
│       ├── entrypoint
│       └── user_setup
├── cassandra-operator.iml
├── cmd
│   └── manager
│       └── main.go
├── deploy
│   ├── operator.yaml
│   ├── role.yaml
│   ├── role_binding.yaml
│   └── service_account.yaml
├── go.mod
├── go.sum
├── pkg
│   ├── apis
│   │   └── apis.go
│   └── controller
│       └── controller.go
├── tools.go
└── version
    └── version.go

添加 API

接下来为我们的自定义资源添加一个新的 API,按照上面我们预定义的资源清单文件,在 Operator 相关根目录下面执行如下命令:

# 等个半分钟就创建好了,大概新增了10个文件左右
$ operator-sdk add api --api-version=cassandra.zhangaoo.com/v1alpha1 --kind=CassandraService

添加控制器

上面我们添加自定义的 API,接下来可以添加对应的自定义 API 的具体实现 Controller,同样在项目根目录下面执行如下命令

$ operator-sdk add controller --api-version=cassandra.zhangaoo.com/v1alpha1 --kind=CassandraService

自定义 API

打开源文件 pkg/apis/cassandra/v1alpha1/cassandraservice_types.go ,需要我们根据我们的需求去自定义结构体 CassandraServiceSpec,我们最上面预定义的资源清单中就有 size、image、ports 这些属性,所有我们需要用到的属性都需要在这个结构体中进行定义

type CassandraServiceSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
	Size	*int32		`json:"size"`
	Image   string 		`json:"image"`
	Resources corev1.ResourceRequirements `json:"resources,omitempty"`
	Envs      []corev1.EnvVar             `json:"envs,omitempty"`
	Ports     []corev1.ServicePort        `json:"ports,omitempty"`
}
type CassandraServiceStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
	appsv1.StatefulSetStatus `json:",inline"`
}

定义完成后,在项目根目录下面执行如下命令:

$ operator-sdk generate k8s

实现业务逻辑

上面 API 描述声明完成了,接下来就需要我们来进行具体的业务逻辑实现了,编写具体的 controller 实现,打开源文件 pkg/controller/cassandraservice/cassandraservice_controller.go ,需要我们去更改的地方也不是很多,核心的就是 Reconcile 方法,该方法就是去不断的 watch 资源的状态,然后根据状态的不同去实现各种操作逻辑

// Reconcile reads that state of the cluster for a CassandraService object and makes changes based on the state read
// and what is in the CassandraService.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic.  This example creates
// a Pod as an example
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileCassandraService) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
	reqLogger.Info("Reconciling CassandraService")

	// Fetch the CassandraService instance
	instance := &cassandrav1alpha1.CassandraService{}
	err := r.client.Get(context.TODO(), request.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			// Request object not found, could have been deleted after reconcile request.
			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
			// Return and don't requeue
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}
	//Service
	// Check if this Service already exists
	serviceFound := &corev1.Service{}
	err = r.client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, serviceFound)
	if err != nil && errors.IsNotFound(err) {
		reqLogger.Info("Creating a new Service", "Service.Namespace", instance.Namespace, "Service.Name", instance.Name)

		// Define a new Service object
		service := r.newServiceForCr(instance)

		// Set CassandraService instance as the owner and controller
		if err := controllerutil.SetControllerReference(instance, service, r.scheme); err != nil {
			return reconcile.Result{}, err
		}

		err = r.client.Create(context.TODO(), service)
		if err != nil {
			return reconcile.Result{}, err
		}
		// Service created successfully - don't requeue
		//return reconcile.Result{}, nil //todo
	} else if err != nil {
		return reconcile.Result{}, err
	}
	// Service already exists - don't requeue
	reqLogger.Info("Skip reconcile: Service already exists", "Service.Namespace", instance.Namespace, "Service.Name", instance.Name)

	//StatefulSet
	// Check if this Service already exists
	statefulSetFound := &appsv1.StatefulSet{}
	err = r.client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, statefulSetFound)
	if err != nil && errors.IsNotFound(err) {
		reqLogger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", instance.Namespace, "StatefulSet.Name", instance.Name)

		// Define a new StatefulSet object
		statefulSet := r.newStatefulSetForCr(instance)

		// Set CassandraService instance as the owner and controller
		if err := controllerutil.SetControllerReference(instance, statefulSet, r.scheme); err != nil {
			return reconcile.Result{}, err
		}

		err = r.client.Create(context.TODO(), statefulSet)
		if err != nil {
			return reconcile.Result{}, err
		}
		// StatefulSet created successfully - don't requeue
		return reconcile.Result{}, nil
	} else if err != nil {
		return reconcile.Result{}, err
	}

	// Ensure the statefulset size is the same as the spec
	reqLogger.Info("Matching size in spec")
	size := instance.Spec.Size
	if *statefulSetFound.Spec.Replicas != *size {
		statefulSetFound.Spec.Replicas = size
		err = r.client.Update(context.TODO(),statefulSetFound)
		if err != nil{
			reqLogger.Info("Failed to update StatefulSet: %v\n", err)
			return reconcile.Result{}, err
		}
		reqLogger.Info("Spec was updated, so request is getting re-queued")
		// Spec updated - return and requeue
		return reconcile.Result{Requeue: true}, nil
	}

	// StatefulSet already exists - don't requeue
	reqLogger.Info("Skip reconcile: CanssandraService already exists", "CanssandraService.Namespace", instance.Namespace, "CanssandraService.Name", instance.Name)
	return reconcile.Result{}, nil
}

调试

$ operator-sdk up local
apiVersion: cassandra.zhangaoo.com/v1alpha1
kind: CassandraService
metadata:
  name: cassandra
  labels:
    app: cassandra
spec:
  # Add fields here
  size: 3
  image: gcr.io/google-samples/cassandra:v13
  ports:
  - containerPort: 7000
    name: intra-node
  - containerPort: 7001
    name: tls-intra-node
  - containerPort: 7199
    name: jmx
  - containerPort: 9042
    name: cql
  - port: 9042
  resources:
    limits:
      cpu: "500m"
      memory: 1Gi
    requests:
      cpu: "500m"
      memory: 1Gi
  env:
  - name: MAX_HEAP_SIZE
    value: 512M
  - name: HEAP_NEWSIZE
    value: 100M
  - name: CASSANDRA_SEEDS
    value: "cassandra-0.cassandra.default.svc.cluster.local"
  - name: CASSANDRA_CLUSTER_NAME
    value: "K8Demo"
  - name: CASSANDRA_DC
    value: "DC1-K8Demo"
  - name: CASSANDRA_RACK
    value: "Rack1-K8Demo"
  - name: POD_IP
    valueFrom:
        ieldRef:
          fieldPath: status.podIP

参考资料

Kubernetes Operator 101