被external-provisioner发送CreateVolumeRequest GRPC请求执行rbd命令,创建pv
分为三阶段就是 Provision Attach Mount
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Stage | | Unstage
Volume | | Volume
+---v----+---+
| VOL_READY |
+------------+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+The lifecycle of a dynamically provisioned volume, from
creation to destruction, when the Node Plugin advertises the
STAGE_UNSTAGE_VOLUME capability.
--endpoint:CSI endpoint (default "unix://tmp/csi.sock")
--metadatastorage:metadata persistence method [node|k8s_configmap]
rbdplugin --nodeid=master-node --endpoint=unix:///csi/csi.sock --v=5 --drivername=rbd.csi.ceph.com --containerized=true --metadatastorage=k8s_configmap
根觉类型是rbd,目录为 /var/lib/kubelet/plugins/rbd,driver name为 rbd.csi.ceph.com
switch driverType {
case rbdType:
rbd.PluginFolder = rbd.PluginFolder + dname
cp, err := util.CreatePersistanceStorage(
rbd.PluginFolder, *metadataStorage, dname)
if err != nil {
os.Exit(1)
}
driver := rbd.NewDriver()
driver.Run(dname, *nodeID, *endpoint, *configRoot, *containerized, cp)
在/var/lib/kubelet/plugins/rbd.csi.ceph.com创建目录 controller与node
// CreatePersistanceStorage creates storage path and initializes new cache
func CreatePersistanceStorage(sPath, metaDataStore, driverName string) (CachePersister, error) {
var err error
if err = createPersistentStorage(path.Join(sPath, "controller")); err != nil {
klog.Errorf("failed to create persistent storage for controller: %v", err)
return nil, err
}
if err = createPersistentStorage(path.Join(sPath, "node")); err != nil {
klog.Errorf("failed to create persistent storage for node: %v", err)
return nil, err
}
cp, err := NewCachePersister(metaDataStore, driverName)
if err != nil {
klog.Errorf("failed to define cache persistence method: %v", err)
return nil, err
}
return cp, err
}
根据启动参数metadatastorage创建cache
// NewCachePersister returns CachePersister based on store
func NewCachePersister(metadataStore, driverName string) (CachePersister, error) {
if metadataStore == "k8s_configmap" {
klog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")
k8scm := &K8sCMCache{}
k8scm.Client = NewK8sClient()
k8scm.Namespace = GetK8sNamespace()
return k8scm, nil
} else if metadataStore == "node" {
klog.Infof("cache-persister: using node as metadata cache persister")
nc := &NodeCache{}
nc.BasePath = PluginFolder + "/" + driverName
nc.CacheDir = "controller"
return nc, nil
}
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
}
保存k8s客户端连接,与namespace
// K8sCMCache to store metadata
type K8sCMCache struct {
Client *k8s.Clientset
Namespace string
}
使用/var/lib/kubelet/plugins/rbd.csi.ceph.com目录
// NodeCache to store metadata
type NodeCache struct {
BasePath string
CacheDir string
}
路径pkg/rbd/rbd.go
启动非阻塞grpc controller
// Run start a non-blocking grpc controller,node and identityserver for
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(driverName, nodeID, endpoint, configRoot string, containerized bool, cachePersister util.CachePersister) {
var err error
klog.Infof("Driver: %v version: %v", driverName, version)
-configroot参数代表ceph cluter配置目录,k8s_objects代表k8s secrets,默认为 /etc/csi-config
// NewConfigStore returns a config store based on value of configRoot. If
// configRoot is not "k8s_objects" then it is assumed to be a path to a
// directory, under which the configuration files can be found
func NewConfigStore(configRoot string) (*ConfigStore, error) {
if configRoot != "k8s_objects" {
klog.Infof("cache-store: using files in path (%s) as config store", configRoot)
fc := &FileConfig{}
fc.BasePath = path.Clean(configRoot)
dc := &ConfigStore{fc}
return dc, nil
}
klog.Infof("cache-store: using k8s objects as config store")
kc := &K8sConfig{}
kc.Client = NewK8sClient()
kc.Namespace = GetK8sNamespace()
dc := &ConfigStore{kc}
return dc, nil
}
// CSIDriver stores driver information
type CSIDriver struct {
name string
nodeID string
version string
cap []*csi.ControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
}
// NewCSIDriver Creates a NewCSIDriver object. Assumes vendor
// version is equal to driver version & does not support optional
// driver plugin info manifest field. Refer to CSI spec for more details.
func NewCSIDriver(name string, v string, nodeID string) *CSIDriver {
driver := CSIDriver{
name: name,
version: v,
nodeID: nodeID,
}
return &driver
}
身份服务:Node Plugin和Controller Plugin都必须实现这些RPC集。协调kubernetes与csi的版本信息,
负责对外暴露这个插件的信息
// NewIdentityServer initialize a identity server for rbd CSI driver
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
return &IdentityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) {
mounter := mount.New("")
if containerized {
ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
if err != nil {
return nil, err
}
mounter = mount.NewNsenterMounter("", ne)
}
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
mounter: mounter,
}, nil
}
// NewControllerServer initialize a controller server for rbd CSI driver
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}
对于k8s configmap中读取snapshot
// LoadExDataFromMetadataStore loads the rbd volume and snapshot
// info from metadata store
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
vol := &rbdVolume{}
// nolint
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
rbdVolumes[identifier] = vol
return nil
})
snap := &rbdSnapshot{}
// nolint
cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
rbdSnapshots[identifier] = snap
return nil
})
klog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
return nil
}
endpoint为unix//csi/csi.sock
s := csicommon.NewNonBlockingGRPCServer()
s.Start(endpoint, r.ids, r.cs, r.ns)
s.Wait()
主要逻辑函数,首先调用rbdStatus执行命令,
# rbd status pvc-d0223724-75f7-11e9-be7d-0800271c9f15 --pool rbd
Watchers: none
未watcher则调用createRBDImage创建RBD image
rbd create pvc-xxxxxxxxxxxxxxx --size xxx --pool xxxx --id xxx -m xxx --key=xxx -image-format xxxx
func (cs *ControllerServer) checkRBDStatus(rbdVol *rbdVolume, req *csi.CreateVolumeRequest, volSizeMiB int) error {
var err error
// Check if there is already RBD image with requested name
//nolint
found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets())
if !found {
// if VolumeContentSource is not nil, this request is for snapshot
if req.VolumeContentSource != nil {
if err = cs.checkSnapshot(req, rbdVol); err != nil {
return err
}
} else {
err = createRBDImage(rbdVol, volSizeMiB, rbdVol.AdminID, req.GetSecrets())
if err != nil {
klog.Warningf("failed to create volume: %v", err)
return status.Error(codes.Internal, err.Error())
}
klog.V(4).Infof("create volume %s", rbdVol.VolName)
}
}
return nil
}
根据--metadatastorage如果指定k8sconfig则调用client-go创建configmap
func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error {
if err := cp.Create(vol.VolID, vol); err != nil {
klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err)
return err
}
return nil
}
# kubectl get cm csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1 -nkube-csi -o yaml
apiVersion: v1
data:
content: '{"volName":"pvc-1589da3b-7608-11e9-be7d-0800271c9f15","volID":"csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1","monitors":"192.168.74.57:6789","monValueFromSecret":"","pool":"rbd","imageFormat":"2","imageFeatures":"layering","volSize":1073741824,"adminId":"admin","userId":"admin","mounter":"rbd","disableInUseChecks":false,"clusterId":""}'
kind: ConfigMap
metadata:
creationTimestamp: "2019-05-14T05:21:25Z"
labels:
com.ceph.ceph-csi/metadata: csi-metadata
name: csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1
namespace: kube-csi
resourceVersion: "515518"
selfLink: /api/v1/namespaces/kube-csi/configmaps/csi-rbd-vol-14bb647a-7608-11e9-88ff-8288f6d6cbd1
uid: 1e7c0d87-7608-11e9-be7d-0800271c9f15
NodePublishVolume mounts the volume mounted to the device path to the target path
{"secrets":"***stripped***","target_path":"/var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount","volume_capability":{"AccessType":{"Mount":{"fs_type":"ext4"}},"access_mode":{"mode":1}}
路径/sys/bus/rbd/devices/x 目录中的name 与pool,与rook flexvolume插件实现类似
// Search /sys/bus for rbd device that matches given pool and image.
func getRbdDevFromImageAndPool(pool string, image string) (string, bool) {
// /sys/bus/rbd/devices/X/name and /sys/bus/rbd/devices/X/pool
sysPath := "/sys/bus/rbd/devices"
if dirs, err := ioutil.ReadDir(sysPath); err == nil {
for _, f := range dirs {
// Pool and name format:
// see rbd_pool_show() and rbd_name_show() at
// https://github.com/torvalds/linux/blob/master/drivers/block/rbd.c
name := f.Name()
// First match pool, then match name.
poolFile := path.Join(sysPath, name, "pool")
// #nosec
poolBytes, err := ioutil.ReadFile(poolFile)
if err != nil {
klog.V(4).Infof("error reading %s: %v", poolFile, err)
continue
}
if strings.TrimSpace(string(poolBytes)) != pool {
klog.V(4).Infof("device %s is not %q: %q", name, pool, string(poolBytes))
continue
}
imgFile := path.Join(sysPath, name, "name")
// #nosec
imgBytes, err := ioutil.ReadFile(imgFile)
if err != nil {
klog.V(4).Infof("error reading %s: %v", imgFile, err)
continue
}
if strings.TrimSpace(string(imgBytes)) != image {
klog.V(4).Infof("device %s is not %q: %q", name, image, string(imgBytes))
continue
}
// Found a match, check if device exists.
devicePath := "/dev/rbd" + name
if _, err := os.Lstat(devicePath); err == nil {
return devicePath, true
}
}
}
return "", false
}
主要是format与mount操作,挂载在node节点的路径下/dev/rbd0 to /var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount
target /var/lib/kubelet/pods/f4a98880-761b-11e9-be7d-0800271c9f15/volumes/kubernetes.io~csi/pvc-1589da3b-7608-11e9-be7d-0800271c9f15/mount
isBlock false
fstype ext4
device /dev/rbd0
readonly false
attributes map[storage.kubernetes.io/csiProvisionerIdentity:1557810988918-8081- adminid:admin imageFeatures:layering imageFormat:2 monitors:192.168.74.57:6789 pool:rbd]
mountflags []
func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
// Publish Path
fsType := req.GetVolumeCapability().GetMount().GetFsType()
readOnly := req.GetReadonly()
attrib := req.GetVolumeContext()
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
isBlock := req.GetVolumeCapability().GetBlock() != nil
targetPath := req.GetTargetPath()
klog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags)
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: mount.NewOsExec()}
if isBlock {
options := []string{"bind"}
if err := diskMounter.Mount(devicePath, targetPath, fsType, options); err != nil {
return err
}
} else {
options := []string{}
if readOnly {
options = append(options, "ro")
}
if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
return err
}
}
return nil
}
// ControllerPublishVolume returns success response
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return &csi.ControllerPublishVolumeResponse{}, nil
}
FROM ceph/ceph:v14.2
LABEL maintainers="Ceph-CSI Authors"
LABEL description="Ceph-CSI Plugin"ENV CSIBIN=/usr/local/bin/cephcsi
COPY cephcsi $CSIBIN
RUN chmod +x $CSIBIN && \
ln -sf $CSIBIN /usr/local/bin/cephcsi-rbd && \
ln -sf $CSIBIN /usr/local/bin/cephcsi-cephfsENTRYPOINT ["/usr/local/bin/cephcsi"]
FROM centos:7
LABEL maintainers="Kubernetes Authors"
LABEL description="RBD CSI Plugin"ENV CEPH_VERSION "mimic"
RUN yum install -y centos-release-ceph && \
yum install -y ceph-common e2fsprogs xfsprogs rbd-nbd && \
yum clean allCOPY rbdplugin /rbdplugin
RUN chmod +x /rbdplugin
ENTRYPOINT ["/rbdplugin"]