【kubernetes/k8s源码分析】 k8s csi plugin attach mount源码分析

太叔飞翰
2023-12-01

    接着kubelet pod 挂载volume源码分析文章https://blog.csdn.net/zhonglinzhang/article/details/89923875

    volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher(),如果使用CSI插件则调用如下:

func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
	k8s := p.host.GetKubeClient()
	if k8s == nil {
		klog.Error(log("unable to get kubernetes client from host"))
		return nil, errors.New("unable to get Kubernetes client")
	}

	return &csiAttacher{
		plugin:        p,
		k8s:           k8s,
		waitSleepTime: 1 * time.Second,
	}, nil
}

        csiAttacher结构实现了Attach方法

type csiAttacher struct {
	plugin        *csiPlugin
	k8s           kubernetes.Interface
	waitSleepTime time.Duration

	csiClient csiClient
}

 

    创建挂盘操作,协调挂盘处理流程

reconcile

      --> operationExecutor.VerifyControllerAttachedVolume

      --> operationExecutor.MountVolume

               --> GenerateMountVolumeFunc

                      --> volumePlugin.NewMounter 第3章节讲解

                      --> attachableVolumePlugin.NewAttacher()

                      --> volumeAttacher.WaitForAttach 第1章节讲解

                      --> volumeDeviceMounter.MountDevice

                      --> volumeMounter.SetUp(fsGroup) 第4章节讲解

 

  node节点信息

  volumesAttached:
  - devicePath: csi-57bf2260979246d9fc2d571fc7b886854ddc82035e3376fe25c5185b46639104
    name: kubernetes.io/csi/rbd.csi.ceph.com^csi-rbd-vol-9f15531e-76e6-11e9-8f1d-a640193e1ea4
  volumesInUse:
  - kubernetes.io/csi/rbd.csi.ceph.com^csi-rbd-vol-9f15531e-76e6-11e9-8f1d-a640193e1ea4

 

1. WaitForAttach

func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {

    1.1 getCSISourceFromSpce函数 

       getCSISourceFromSpce根据pv得到如下信息

  csi:
    driver: rbd.csi.ceph.com
    fsType: ext4
    nodePublishSecretRef:
      name: csi-rbd-secret
      namespace: default
    volumeAttributes:
      adminid: admin
      imageFeatures: layering
      imageFormat: "2"
      monitors: 192.168.74.57:6789
      pool: rbd
      storage.kubernetes.io/csiProvisionerIdentity: 1557904506281-8081-
    volumeHandle: csi-rbd-vol-9f15531e-76e6-11e9-8f1d-a640193e1ea4

    1.2 最终逻辑函数,看看干什么操作了

func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
	attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
	if err != nil {
		klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
		return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
	}

 

2. waitForVolumeAttachmentInternal

      等待插件进行attach操作,接着参看kubernetes csi external-attacher 源码分析 https://blog.csdn.net/zhonglinzhang/article/details/89643001

      external-attacher watch到volumeAttachments资源,则处理,最终发送GRPC请求ControllerPublishVolumeRequest

 

      2.1 只要status为attached则算成功

func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
	// if being deleted, fail fast
	if attachment.GetDeletionTimestamp() != nil {
		klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
		return false, errors.New("volume attachment is being deleted")
	}
	// attachment OK
	if attachment.Status.Attached {
		return true, nil
	}
	// driver reports attach error
	attachErr := attachment.Status.AttachError
	if attachErr != nil {
		klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
		return false, errors.New(attachErr.Message)
	}
	return false, nil
}

 

3. NewMounter函数

    路径 pkg/volume/csi/csi_mounter.go,csiMountMgr结构体

mounter := &csiMountMgr{
	plugin:       p,
	k8s:          k8s,
	spec:         spec,
	pod:          pod,
	podUID:       pod.UID,
	driverName:   csiDriverName(pvSource.Driver),
	volumeID:     pvSource.VolumeHandle,
	specVolumeID: spec.Name(),
	readOnly:     readOnly,
}
mounter.csiClientGetter.driverName = csiDriverName(pvSource.Driver)

 

4. SetUpAt函数

     不知道为沙起个这么鬼名字函数

    4.1 与CSI插件建立socket连接

csiSource, err := getCSISourceFromSpec(c.spec)
if err != nil {
	klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
	return err
}
klog.Infof("zzlin SetUpAt csiSource: %v", csiSource.String())

csi, err := c.csiClientGetter.Get()
if err != nil {
	klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
	return err
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

    4.2 发送GRPC请求NodePublishVolume

fsType := csiSource.FSType
err = csi.NodePublishVolume(
	ctx,
	c.volumeID,
	c.readOnly,
	deviceMountPath,
	dir,
	accessMode,
	c.publishContext,
	attribs,
	nodePublishSecrets,
	fsType,
	c.spec.PersistentVolume.Spec.MountOptions,
)

 

 类似资料: