【kubernetes/k8s源码分析】 ceph csi rbd plugin 源码分析

荆城
2023-12-01

     被external-provisioner发送CreateVolumeRequest GRPC请求执行rbd命令,创建pv

    分为三阶段就是 Provision Attach Mount

CreateVolume +------------+ DeleteVolume
 +------------->|  CREATED   +--------------+
 |              +---+----+---+              |
 |       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++
|X|          Volume |    | Volume          | |
+-+             +---v----+---+             +-+
                | NODE_READY |
                +---+----^---+
               Node |    | Node
              Stage |    | Unstage
             Volume |    | Volume
                +---v----+---+
                |  VOL_READY |
                +------------+
               Node |    | Node
            Publish |    | Unpublish
             Volume |    | Volume
                +---v----+---+
                | PUBLISHED  |
                +------------+

The lifecycle of a dynamically provisioned volume, from
creation to destruction, when the Node Plugin advertises the
STAGE_UNSTAGE_VOLUME capability.
 

启动命令

    --endpoint:CSI endpoint (default "unix://tmp/csi.sock")

    --metadatastorage:metadata persistence method [node|k8s_configmap]

     rbdplugin --nodeid=master-node --endpoint=unix:///csi/csi.sock --v=5 --drivername=rbd.csi.ceph.com --containerized=true --metadatastorage=k8s_configmap

 

1. main函数

      根觉类型是rbd,目录为 /var/lib/kubelet/plugins/rbd,driver name为 rbd.csi.ceph.com

switch driverType {
case rbdType:
	rbd.PluginFolder = rbd.PluginFolder + dname
	cp, err := util.CreatePersistanceStorage(
		rbd.PluginFolder, *metadataStorage, dname)
	if err != nil {
		os.Exit(1)
	}
	driver := rbd.NewDriver()
	driver.Run(dname, *nodeID, *endpoint, *configRoot, *containerized, cp)

    1.1 CreatePersistanceStorage创建路径初始化cache

      在/var/lib/kubelet/plugins/rbd.csi.ceph.com创建目录 controller与node

// CreatePersistanceStorage creates storage path and initializes new cache
func CreatePersistanceStorage(sPath, metaDataStore, driverName string) (CachePersister, error) {
	var err error
	if err = createPersistentStorage(path.Join(sPath, "controller")); err != nil {
		klog.Errorf("failed to create persistent storage for controller: %v", err)
		return nil, err
	}

	if err = createPersistentStorage(path.Join(sPath, "node")); err != nil {
		klog.Errorf("failed to create persistent storage for node: %v", err)
		return nil, err
	}

	cp, err := NewCachePersister(metaDataStore, driverName)
	if err != nil {
		klog.Errorf("failed to define cache persistence method: %v", err)
		return nil, err
	}
	return cp, err
}

 

2. NewCachePersister函数返回cahe

    根据启动参数metadatastorage创建cache

// NewCachePersister returns CachePersister based on store
func NewCachePersister(metadataStore, driverName string) (CachePersister, error) {
	if metadataStore == "k8s_configmap" {
		klog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")
		k8scm := &K8sCMCache{}
		k8scm.Client = NewK8sClient()
		k8scm.Namespace = GetK8sNamespace()
		return k8scm, nil
	} else if metadataStore == "node" {
		klog.Infof("cache-persister: using node as metadata cache persister")
		nc := &NodeCache{}
		nc.BasePath = PluginFolder + "/" + driverName
		nc.CacheDir = "controller"
		return nc, nil
	}
	return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
}

    2.1 如果metadatastore为k8s_configmap

      保存k8s客户端连接,与namespace

// K8sCMCache to store metadata
type K8sCMCache struct {
	Client    *k8s.Clientset
	Namespace string
}

    2.2 如果metadatastore为node

     使用/var/lib/kubelet/plugins/rbd.csi.ceph.com目录

// NodeCache to store metadata
type NodeCache struct {
	BasePath string
	CacheDir string
}

 

3. Driver Run函数

    路径pkg/rbd/rbd.go

    启动非阻塞grpc controller

// Run start a non-blocking grpc controller,node and identityserver for
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(driverName, nodeID, endpoint, configRoot string, containerized bool, cachePersister util.CachePersister) {
	var err error
	klog.Infof("Driver: %v version: %v", driverName, version)

    3.1 NewConfigStore

    -configroot参数代表ceph cluter配置目录,k8s_objects代表k8s secrets,默认为 /etc/csi-config

// NewConfigStore returns a config store based on value of configRoot. If
// configRoot is not "k8s_objects" then it is assumed to be a path to a
// directory, under which the configuration files can be found
func NewConfigStore(configRoot string) (*ConfigStore, error) {
	if configRoot != "k8s_objects" {
		klog.Infof("cache-store: using files in path (%s) as config store", configRoot)
		fc := &FileConfig{}
		fc.BasePath = path.Clean(configRoot)
		dc := &ConfigStore{fc}
		return dc, nil
	}

	klog.Infof("cache-store: using k8s objects as config store")
	kc := &K8sConfig{}
	kc.Client = NewK8sClient()
	kc.Namespace = GetK8sNamespace()
	dc := &ConfigStore{kc}
	return dc, nil
}

    3.2 实例化CSIDriver

// CSIDriver stores driver information
type CSIDriver struct {
	name    string
	nodeID  string
	version string
	cap     []*csi.ControllerServiceCapability
	vc      []*csi.VolumeCapability_AccessMode
}
// NewCSIDriver Creates a NewCSIDriver object. Assumes vendor
// version is equal to driver version &  does not support optional
// driver plugin info manifest field. Refer to CSI spec for more details.
func NewCSIDriver(name string, v string, nodeID string) *CSIDriver {
	driver := CSIDriver{
		name:    name,
		version: v,
		nodeID:  nodeID,
	}

	return &driver
}

    3.3 实例化IdentityServer

      身份服务:Node Plugin和Controller Plugin都必须实现这些RPC集。协调kubernetes与csi的版本信息,

      负责对外暴露这个插件的信息

// NewIdentityServer initialize a identity server for rbd CSI driver
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
	return &IdentityServer{
		DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
	}
}

    3.4 实例化NodeServer

  • 节点服务:Node Plugin必须实现这些RPC集。 将volume存储卷挂载到指定目录中,/var/lib/kubelet/plugins/${plugin_name}/csi.sock
// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) {
	mounter := mount.New("")
	if containerized {
		ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
		if err != nil {
			return nil, err
		}
		mounter = mount.NewNsenterMounter("", ne)
	}
	return &NodeServer{
		DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
		mounter:           mounter,
	}, nil
}

    3.5 实例化ControllerServer

  • 控制器服务:Controller Plugin必须实现这些RPC集。创建以及管理volume管理卷
// NewControllerServer initialize a controller server for rbd CSI driver
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
	return &ControllerServer{
		DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
		MetadataStore:           cachePersister,
	}
}

    3.6 load rbd 卷和snapshot

      对于k8s configmap中读取snapshot

// LoadExDataFromMetadataStore loads the rbd volume and snapshot
// info from metadata store
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
	vol := &rbdVolume{}
	// nolint
	cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
		rbdVolumes[identifier] = vol
		return nil
	})

	snap := &rbdSnapshot{}
	// nolint
	cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
		rbdSnapshots[identifier] = snap
		return nil
	})

	klog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
	return nil
}

    3.7 建立GRPC server

     endpoint为unix//csi/csi.sock

s := csicommon.NewNonBlockingGRPCServer()
s.Start(endpoint, r.ids, r.cs, r.ns)
s.Wait()

 

4. CreateVolume

    4.1 checkRBDStatus函数

        主要逻辑函数,首先调用rbdStatus执行命令,

# rbd status pvc-d0223724-75f7-11e9-be7d-0800271c9f15 --pool rbd
Watchers: none

        未watcher则调用createRBDImage创建RBD image

        rbd create pvc-xxxxxxxxxxxxxxx --size xxx --pool xxxx --id xxx -m xxx --key=xxx -image-format xxxx 

func (cs *ControllerServer) checkRBDStatus(rbdVol *rbdVolume, req *csi.CreateVolumeRequest, volSizeMiB int) error {
	var err error
	// Check if there is already RBD image with requested name
	//nolint
	found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets())
	if !found {
		// if VolumeContentSource is not nil, this request is for snapshot
		if req.VolumeContentSource != nil {
			if err = cs.checkSnapshot(req, rbdVol); err != nil {
				return err
			}
		} else {
			err = createRBDImage(rbdVol, volSizeMiB, rbdVol.AdminID, req.GetSecrets())
			if err != nil {
				klog.Warningf("failed to create volume: %v", err)
				return status.Error(codes.Internal, err.Error())
			}

			klog.V(4).Infof("create volume %s", rbdVol.VolName)
		}
	}
	return nil
}

    4.2 storeVolumeMetadata函数

      根据--metadatastorage如果指定k8sconfig则调用client-go创建configmap

func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error {
	if err := cp.Create(vol.VolID, vol); err != nil {
		klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err)
		return err
	}

	return nil
}

# kubectl get cm csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1 -nkube-csi -o yaml
apiVersion: v1
data:
  content: '{"volName":"pvc-1589da3b-7608-11e9-be7d-0800271c9f15","volID":"csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1","monitors":"192.168.74.57:6789","monValueFromSecret":"","pool":"rbd","imageFormat":"2","imageFeatures":"layering","volSize":1073741824,"adminId":"admin","userId":"admin","mounter":"rbd","disableInUseChecks":false,"clusterId":""}'
kind: ConfigMap
metadata:
  creationTimestamp: "2019-05-14T05:21:25Z"
  labels:
    com.ceph.ceph-csi/metadata: csi-metadata
  name: csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1
  namespace: kube-csi
  resourceVersion: "515518"
  selfLink: /api/v1/namespaces/kube-csi/configmaps/csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1
  uid: 1e7c0d87-7608-11e9-be7d-0800271c9f15

 

5.  NodePublishVolume函数

  NodePublishVolume mounts the volume mounted to the device path to the target path

{"secrets":"***stripped***","target_path":"/var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount","volume_capability":{"AccessType":{"Mount":{"fs_type":"ext4"}},"access_mode":{"mode":1}}

    5.1  Mapping RBD image

      路径/sys/bus/rbd/devices/x 目录中的name 与pool,与rook flexvolume插件实现类似

// Search /sys/bus for rbd device that matches given pool and image.
func getRbdDevFromImageAndPool(pool string, image string) (string, bool) {
	// /sys/bus/rbd/devices/X/name and /sys/bus/rbd/devices/X/pool
	sysPath := "/sys/bus/rbd/devices"
	if dirs, err := ioutil.ReadDir(sysPath); err == nil {
		for _, f := range dirs {
			// Pool and name format:
			// see rbd_pool_show() and rbd_name_show() at
			// https://github.com/torvalds/linux/blob/master/drivers/block/rbd.c
			name := f.Name()
			// First match pool, then match name.
			poolFile := path.Join(sysPath, name, "pool")
			// #nosec
			poolBytes, err := ioutil.ReadFile(poolFile)
			if err != nil {
				klog.V(4).Infof("error reading %s: %v", poolFile, err)
				continue
			}
			if strings.TrimSpace(string(poolBytes)) != pool {
				klog.V(4).Infof("device %s is not %q: %q", name, pool, string(poolBytes))
				continue
			}
			imgFile := path.Join(sysPath, name, "name")
			// #nosec
			imgBytes, err := ioutil.ReadFile(imgFile)
			if err != nil {
				klog.V(4).Infof("error reading %s: %v", imgFile, err)
				continue
			}
			if strings.TrimSpace(string(imgBytes)) != image {
				klog.V(4).Infof("device %s is not %q: %q", name, image, string(imgBytes))
				continue
			}
			// Found a match, check if device exists.
			devicePath := "/dev/rbd" + name
			if _, err := os.Lstat(devicePath); err == nil {
				return devicePath, true
			}
		}
	}
	return "", false
}

    5.2 mountVolume函数

       主要是format与mount操作,挂载在node节点的路径下/dev/rbd0 to /var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount

target /var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount
isBlock false
fstype ext4
device /dev/rbd0
readonly false
attributes map[storage.kubernetes.io/csiProvisionerIdentity:1557810988918-8081- adminid:admin imageFeatures:layering imageFormat:2 monitors:192.168.74.57:6789 pool:rbd]
 mountflags []     

func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
	// Publish Path
	fsType := req.GetVolumeCapability().GetMount().GetFsType()
	readOnly := req.GetReadonly()
	attrib := req.GetVolumeContext()
	mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
	isBlock := req.GetVolumeCapability().GetBlock() != nil
	targetPath := req.GetTargetPath()

	klog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
		targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags)

	diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: mount.NewOsExec()}
	if isBlock {
		options := []string{"bind"}
		if err := diskMounter.Mount(devicePath, targetPath, fsType, options); err != nil {
			return err
		}
	} else {
		options := []string{}
		if readOnly {
			options = append(options, "ro")
		}

		if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
			return err
		}
	}
	return nil
}

 

6.  ControllerPublishVolume函数

// ControllerPublishVolume returns success response
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
	return &csi.ControllerPublishVolumeResponse{}, nil
}

 

Dockerfile

FROM ceph/ceph:v14.2
LABEL maintainers="Ceph-CSI Authors"
LABEL description="Ceph-CSI Plugin"

ENV CSIBIN=/usr/local/bin/cephcsi

COPY cephcsi $CSIBIN

RUN chmod +x $CSIBIN && \
    ln -sf $CSIBIN /usr/local/bin/cephcsi-rbd && \
    ln -sf $CSIBIN /usr/local/bin/cephcsi-cephfs

ENTRYPOINT ["/usr/local/bin/cephcsi"]

 

FROM centos:7
LABEL maintainers="Kubernetes Authors"
LABEL description="RBD CSI Plugin"

ENV CEPH_VERSION "mimic"
RUN yum  install -y centos-release-ceph && \
    yum install -y ceph-common e2fsprogs xfsprogs rbd-nbd && \ 
    yum clean all

COPY rbdplugin /rbdplugin
RUN chmod +x /rbdplugin
ENTRYPOINT ["/rbdplugin"]

 类似资料: