https://www.cnblogs.com/oolo/p/11672720.html
ListAndWatch
、Allocate
rpc,收集 device 信息、准备 device 环境;device plugin 下文简称 DP,device manager 下文简称 DM
ListAndWatch
获得 device 信息,faked device ID + is_health;Allocate
得到启动容器所需的信息(env,mount,device,annotation);制作 docker
Dockerfile 中编译 go 代码,得到 gpushare-device-plugin-v2
,将其放置到 /usr/bin/
,Entrypoint 为 gpushare-device-plugin-v2 -logtostderr
daemonset 启动该容器后,会执行 gpushare-device-plugin-v2 -logtostderr
启动
主文件 cmd/nvidia/main.go
// 入口函数在此
ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, translatememoryUnits(*memoryUnit))
err := ngm.Run()
shareGPUManager ---run()---> NvidiaDevicePlugin ----Serve()----> Start GRPC server and Register
这个地址是常量,不可改变
// gpushare-device-plugin/pkg/gpu/nvidia/server.go
import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1")
err = m.Register(pluginapi.KubeletSocket, resourceName)
pluginapi.KubeletSocket 是常量 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go
package v1beta1
const (
// Healthy means that the device is healthy
Healthy = "Healthy"
// UnHealthy means that the device is unhealthy
Unhealthy = "Unhealthy"
// Current version of the API supported by kubelet
Version = "v1beta1"
// DevicePluginPath is the folder the Device Plugin is expecting sockets to be on
// Only privileged pods have access to this path
// Note: Placeholder until we find a "standard path"
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// Timeout duration in secs for PreStartContainer RPC
KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}
resourceName 是常量 gpushare-device-plugin/pkg/gpu/nvidia/const.go
const (
resourceName = "aliyun.com/gpu-mem")
参数包括:DM 与 DP 通信的版本号
、DP 的 Unix socket 名字
、资源的类型名
conn, err := dial(kubeletEndpoint, 5*time.Second)
client := pluginapi.NewRegistrationClient(conn)
// 其中三者都是常量:
// Endpoint = aliyungpushare.sock
// pluginapi.Version = v1beta1
// resourceName = aliyun.com/gpu-mem
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
检查参数是否合法,创建 endpointImpl 来管理。
在 kubernetes/pkg/kubelet/cm/devicemanager/manager.go
中
go m.addEndpoint(r)
注意,register rpc 没有返回值,但是第二个参数如果不是 nil 则就是有错误
// 正确
return &pluginapi.Empty{}, nil
// 出错
return &pluginapi.Empty{}, fmt.Errorf(errorString)
详细流程:
runEndpoint
,调用一次 ListAndWatch
综上,当 DP 发来注册消息后,DM 组织几个结构:
m.endpoints[resourceName]
ListAndWatch
后,立刻组织 m.healthyDevices[resourceName]
、m.unhealthyDevices[resourceName]
register 时候会调用一次,而且仅仅就这一次,因为 ListAndWatch 是个 GRPC 长连接,DP 可以通过这个长连接不停的反馈。
参见说明 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
message ListAndWatchResponse {
repeated Device devices = 1;
}
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
}
由 proto 文件中所示,DM 查询无需参数,回复的内容是 repeated deviceID and is_health
注意 gpushare-device-plugin 返回的是 faked device ID,比如这个机器上插了 8 GPU 16GB 内存。
我想大家和我一样,认为是 8 个 GPU 设备,其实这个理解是错误的,其实 gpushare-device-plugin 会上报 8*16 个设备,每个设备会有假的 deviceID
这是一个很庞杂的问题,简单来说,kubelet
初始化时,有一个步骤是初始化 DM,DM 本身提供了 Allocate
函数供 kubelet
调用。而 DM 中,又会调用 DP 的 rpc Allocate
方法。
kubelet 什么时候调用 DM 的 Allocate
我们根据 kubernetes/pkg/kubelet/cm/devicemanager/types.go
中可知,kubelet 肯定通过某种方式调用了下面的函数;
type Manager interface {
// Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning
// device plugin to allow setup procedures to take place, and for the
// device plugin to provide runtime settings to use the device (environment
// variables, mount points and device files). The node object is provided
// for the device manager to update the node capacity to reflect the
// currently available devices.
Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
...
}
通过一系列分析,确实如同我们的猜测,只不过略显复杂:
klet.admitHandlers
调用 AddPodAdmitHandler
将一系列 PodAdmitHandler
加入 kubelet 的核心结构 klet.adminHandler
中lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)
也被加入;// 注意,PredicateAdmitHandler 实现了 Admit 方法,因此,PredicateAdmitHandler 的实例的指针就可以被 AddPodAdmitHandler 直接加入
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
syncLoop
中的 syncLoopIteration
中,当出现新的 pod 申请时,会调用 handler.HandlePodUpdates(u.Pods)
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
// 看过源码,这里的 handler 就是 kl
handler.HandlePodAdditions(u.Pods)
HandlePodAdditions
中调用 kl.canAdmitPod(activePods, pod)
canAdmitPod
中:
// 会调用 podAdmitHandler.Admit,因为 podAdmitHandler 是 interface,因此根据实际的类,调用不同的方法
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
对于 PredicateAdmitHandler.Admit
,会调用 err = w.pluginResourceUpdateFunc(nodeInfo, attrs)
, pluginResourceUpdateFunc
是之前注册的 klet.containerManager.UpdatePluginResources
klet.containerManager.UpdatePluginResources
最终调用:
// 终于调用到了 DM 的 Allocate 方法
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs)
}
上文的行文逻辑是从 pod 创建后,是如果最终调用到 Allocate
函数的流程,但是探索的过程往往是相反的,是从 Allocate
开始,看谁调用的它。
这里最复杂的地方在于,UpdatePluginResources
是谁调用的,通过全局搜索,发现只有其被注册的地方,没有其被使用的地方。我们再来整理下思路:
UpdatePluginResources
被注册到了 PredicateAdmitHandler
,PredicateAdmitHandler
又被加入到 klet.admitHandlers
klet.admitHandlers
逐一调用 podAdmitHandler.Admit
方法pluginResourceUpdateFunc
方法,这个方法正好是注册的 UpdatePluginResources
关于 kubelet 启动后有哪几个大 loop,又有哪些小 loop,我们可以以后再分析,至少,创建 pod 时确实会调用 Allocate
。
DM 的 Allocate 详解
Allocate 代码 in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
err := m.allocatePodResources(pod)
if err != nil {
klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
return err
}
m.mutex.Lock()
defer m.mutex.Unlock()
// quick return if no pluginResources requested
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
return nil
}
m.sanitizeNodeAllocatable(node)
return nil
}
看完了 Allocate
函数和参数,感觉有三座大山:
err := m.allocatePodResources(pod)
m.allocDevices
填入预申请的 devicem.podDevices
m.podDevices
m.sanitizeNodeAllocatable(node)
newAllocatableResource
到 node 结构体中总结,Allocate 并不返回什么信息,仅仅是做好预分配,将 DM 结构体中的数据结构进行变更。注意,这其中会调用 DP 的 Allocate 方法,但是返回的内容也仅仅先填写到 m.podDevice
,再啰嗦一句,该函数就是预分配资源。具体怎么用,什么时候用,还需要更进一步的分析。
下面是对 Allocate
详细分析:
m.allocatePodResources(pod) in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
// 注意,devicesToReuse 一开始为空,什么内容都没有
devicesToReuse := make(map[string]sets.String)
// pod.Spec.InitContainers 参见:https://github.com/kubernetes/api/blob/master/core/v1/types.go
// InitContainers 是一个 pod 的最初的 container,锁定网络
for _, container := range pod.Spec.InitContainers {
// 将 `m.allocDevices` 填入预申请的 device
// 请求 DP 的 allocate(rpc)方法将返回的信息填入 `m.podDevices`
// 本质上说,到这一步,只改变了 DM 的两个数据结构的值而已,什么都没有变化
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
// in kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go
// 将 `m.podDevices` 中的 pod + container 的 resource 内容赋值到 `devicesToReuse`,注意,这个操作在 for 循环中,因此 devicesToReuse 会产生影响
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
for _, container := range pod.Spec.Containers {
// 同上,是针对真正的 pod 中的 container
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
return nil
}
allocateContainerResources in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
// 分配的资源在 allocDevices,且已经写入了 m.allocatedDevices
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
// 取出 resouce(resourceName 为 aliyun.com/gpu-mem)的 GRPC server 地址
eI, ok := m.endpoints[resource]
// 调用 DP 的 allocate
resp, err := eI.e.allocate(devs)
// 将结果写入 m.podDevices
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
devicesToAllocate,返回是需要被分配的 device。
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
// containerDevices in `kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go`
// 该函数把需要分配的 device list 返回出去,
// 首先尝试使用 reusableDevices,如果足够了,就返回,否则还会额外的返回全新的 device
// 如果没有必要返回 device 列表,则返回 nil
devices := m.podDevices.containerDevices(podUID, contName, resource)
// reusableDevices
// Allocates from reusableDevices list first.
for device := range reusableDevices {
devices.Insert(device)
needed--
if needed == 0 {
return devices, nil
}
}
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
//
devices.Insert(device)
}
return devices, nil
sanitizeNodeAllocatable in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
var newAllocatableResource *schedulernodeinfo.Resource
allocatableResource := node.AllocatableResource()
if allocatableResource.ScalarResources == nil {
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
}
for resource, devices := range m.allocatedDevices {
needed := devices.Len()
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
if ok && int(quant) >= needed {
continue
}
// Needs to update nodeInfo.AllocatableResource to make sure
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
if newAllocatableResource == nil {
newAllocatableResource = allocatableResource.Clone()
}
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
}
// 将 allocatedDevices 写入 node 结构中,即:n.allocatableResource = allocatableResource
if newAllocatableResource != nil {
node.SetAllocatableResource(newAllocatableResource)
}
}
参数部分相对容易,只要查看 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto
即可
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// 传入的结构体 AllocateRequest,简言之就是一堆 deviceID
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}
message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}
// 返回的结构体 AllocateResponse,简言之,是传入容器的环境变量、文件夹映射、设备映射,和 annotations,其中 mount 和 device 均是 container_path、host_path、permission。
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}
message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}
// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}
有一个核心的问题,DM 通过 DP 的 ListAndWatch 接口查到的是几个 device?如果这个机器上插了 8 GPU 16GB 内存。
我想大家和我一样,认为是 8 个 GPU 设备,其实这个理解是错误的,其实 gpushare-device-plugin 会上报 8*16 个设备,每个设备会有假的 deviceID:
// gpushare-device-plugin/pkg/gpu/nvidia/nvidia.go func getDevices()
fakeID := generateFakeDeviceID(d.UUID, j)
devs = append(devs, &pluginapi.Device{
ID: fakeID,
Health: pluginapi.Healthy,
}
明确这个问题后,可以再看看 DP 以 gpushare-device-plugin
为例是怎么处理的:
// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
// 猜测:`Allocate` 的流程中,分配卡就变得容易,我们可以想象,一定是把多个 faked device ID 转化为一张真正的卡,如果出现了 faked device IDs 不属于同一张卡,那么一定是调度出现了问题!
// 实际上,DP 从来不做调度管理,是根据 scheduler 增加 annotation 来知道到底 faked device ID 转化为哪一张真正的卡
responses := pluginapi.AllocateResponse{}
log.Infoln("----Allocating GPU for gpu mem is started----")
var (
podReqGPU uint
found bool
assumePod *v1.Pod
)
// podReqGPU = uint(0)
// 注意,Allocate 是 Pod 的请求,不是 Pod 下面的 container 的独立请求,因此,podReqGPU 表示,该 pod 一共需要多少 faked 的卡
for _, req := range reqs.ContainerRequests {
podReqGPU += uint(len(req.DevicesIDs))
}
// 请求几个 faked GPU 卡
log.Infof("RequestPodGPUs: %d", podReqGPU)
m.Lock()
defer m.Unlock()
log.Infoln("checking...")
// 需要仔细分析
// 通过查询 k8s api,获得所有的处于 pending 状态的,并且需要被该 DP 所在 node 处理的 pod
// 确实是 getCandidatePods
pods, err := getCandidatePods()
if err != nil {
log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
if log.V(4) {
for _, pod := range pods {
log.Infof("Pod %s in ns %s request GPU Memory %d with timestamp %v",
pod.Name,
pod.Namespace,
getGPUMemoryFromPodResource(pod),
getAssumeTimeFromPodAnnotation(pod))
}
}
for _, pod := range pods {
// 看下每一个 pod 需要的内存和本次请求的作比较,如果相同,则假设就是这个 pod 发来的请求(不够严谨?)
// 我想,从 DM <-> DP 直接通信的参数本意上来说,DP 不需要知道我是为谁(哪个 pod)服务,但是为什么 gpushare-device-plugin 需要知道呢?
if getGPUMemoryFromPodResource(pod) == podReqGPU {
log.Infof("Found Assumed GPU shared Pod %s in ns %s with GPU Memory %d",
pod.Name,
pod.Namespace,
podReqGPU)
assumePod = pod
found = true
break
}
}
if found {
// 查找 Pod 中 annotation 的 `ALIYUN_COM_GPU_MEM_IDX` 内容是什么,比如 0 代表用第 0 块卡
// 这个我猜测是在调度的时候被打上的
// 否则,分配失败会报错
id := getGPUIDFromPodAnnotation(assumePod)
if id < 0 {
log.Warningf("Failed to get the dev ", assumePod)
}
candidateDevID := ""
if id >= 0 {
ok := false
// 得到设备名称
candidateDevID, ok = m.GetDeviceNameByIndex(uint(id))
if !ok {
log.Warningf("Failed to find the dev for pod %v because it's not able to find dev with index %d",
assumePod,
id)
id = -1
}
}
if id < 0 {
return buildErrResponse(reqs, podReqGPU), nil
}
// 1. Create container requests
// 构造返回的内容
for _, req := range reqs.ContainerRequests {
// reqGPU 是每一个 container 需要的卡,和 podReqGPU 要区分开
reqGPU := uint(len(req.DevicesIDs))
// 传入的环境变量为:
/*
NVIDIA_VISIBLE_DEVICES: 也许是GPU0
ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
ALIYUN_COM_GPU_MEM_POD: 10 (POD 总 GPU memory)
ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一个 container 的 GPU memory)
ALIYUN_COM_GPU_MEM_DEV: 16 (这个卡总共能提供多少资源)
*/
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
envNVGPU: candidateDevID,
EnvResourceIndex: fmt.Sprintf("%d", id),
EnvResourceByPod: fmt.Sprintf("%d", podReqGPU),
EnvResourceByContainer: fmt.Sprintf("%d", reqGPU),
EnvResourceByDev: fmt.Sprintf("%d", getGPUMemory()),
},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
// 2. Update Pod spec
// 直接改 pod 中的 annotations,生成新的数据结构
// ALIYUN_COM_GPU_MEM_ASSIGNED = true
// ALIYUN_COM_GPU_MEM_ASSUME_TIME = 时间戳
// 感觉很暴力!
newPod := updatePodAnnotations(assumePod)
// 使之生效
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
if err != nil {
// 如果设置失败,有重试机制
// the object has been modified; please apply your changes to the latest version and try again
if err.Error() == OptimisticLockErrorMsg {
// retry
pod, err := clientset.CoreV1().Pods(assumePod.Namespace).Get(assumePod.Name, metav1.GetOptions{})
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
newPod = updatePodAnnotations(pod)
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
} else {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
}
} else {
log.Warningf("invalid allocation requst: request GPU memory %d can't be satisfied.",
podReqGPU)
// return &responses, fmt.Errorf("invalid allocation requst: request GPU memory %d can't be satisfied", reqGPU)
return buildErrResponse(reqs, podReqGPU), nil
}
log.Infof("new allocated GPUs info %v", &responses)
log.Infoln("----Allocating GPU for gpu mem is ended----")
// // Add this to make sure the container is created at least
// currentTime := time.Now()
// currentTime.Sub(lastAllocateTime)
// 将 1. Create container requests 中的内容返回了
return &responses, nil
}
我们需要总结一下这个流程:
NVIDIA_VISIBLE_DEVICES: 也许是GPU0
ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
ALIYUN_COM_GPU_MEM_POD: 10 (POD 总 GPU memory)
ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一个 container 的 GPU memory)
ALIYUN_COM_GPU_MEM_DEV: 16 (这个卡总共能提供多少资源)
ALIYUN_COM_GPU_MEM_ASSIGNED = true
ALIYUN_COM_GPU_MEM_ASSUME_TIME = 时间戳
至于 ALIYUN_COM_GPU_MEM_ASSIGNED
等有什么用,我们会另起文章分析。
首先,一定是 DP 先发现异常的。以 gpushare-device-plugin 为例,在启动的时候,会增加监控 gpushare-device-plugin/pkg/gpu/nvidia/server.go
:
func (m *NvidiaDevicePlugin) Start() error {
...
go m.healthcheck()
...
}
func (m *NvidiaDevicePlugin) healthcheck() {
ctx, cancel := context.WithCancel(context.Background())
var xids chan *pluginapi.Device
if m.healthCheck {
xids = make(chan *pluginapi.Device)
// 监控所有的 dev
go watchXIDs(ctx, m.devs, xids)
}
for {
select {
case <-m.stop:
cancel()
return
case dev := <-xids:
// 如果异常,则调用 unhealthy 方法,该方法向信号参数 m.health 写 dev 的内容
m.unhealthy(dev)
}
}
}
func watchXIDs(ctx context.Context, devs []*pluginapi.Device, xids chan<- *pluginapi.Device) {
通过 nvidia 库 nvml RegisterEventForDevice,并监听是否有异常出现,有异常则往管道里写错误,一个用不停止的大循环
其次,如果 m.unhealthy 出现了异常设备,如何上报?或者是 DM 定期轮询 ListAndWatch?
通过 ListAndWatch 代码 gpushare-device-plugin/pkg/gpu/nvidia/server.go
可知,实际上都不是!
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
// 之前未曾注意到这个循环,当 DM 在 DP register 后,会立刻调用该函数,当正常返回后,该函数进入一个死循环
// 当 m.health 有内容了,立刻向 DM 发送出错的信息,把所有卡都置为失败,这个确实值得商榷
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
最后,DM 真的有一个进程一直在等待 ListAndWatchResponse
么?
简言之是的,因为 DM 调用 ListAndResponse
后,会从 stream 流中不断的获得返回的内容,没有新的,则会卡在 stream.Recv()
中,当有异常的时候,调用 callback 函数后,会 m.unhealthyDevices[resourceName].Insert(dev.ID)
我们知道 kubelet 调用 DM 的 Allocate
仅仅是将资源准备好(设置几个 dict 而已)(资源是在 DM 中管理的,DP 不记录资源的使用),但是什么时候 kubelet 才真正让这些资源被用起来呢?
分析的过程:
func (m *ManagerImpl) GetDeviceRunContainerOptions
这个函数是用来返回 options 的,其在 kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (cm *containerManagerImpl) GetResources
in kubernetes/pkg/kubelet/cm/container_manager_linux.go
调用了,该函数返回所有的 optionsfunc (kl *Kubelet) GenerateRunContainerOptions
in kubernetes/pkg/kubelet/kubelet_pods.go
该函数调用了func (m *kubeGenericRuntimeManager) generateContainerConfig
in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) startContainer
in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) SyncPod
in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (kl *Kubelet) syncPod
in kubernetes/pkg/kubelet/kubelet.go
func (m *manager) Start()
in kubernetes/pkg/kubelet/status/status_manager.go
当有 m.podStatusChannel 事件时,SyncPod
被触发func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
in kubernetes/pkg/kubelet/kubelet.go
Allocate
应该在先,然后才是 GetDeviceRunContainerOptions
,分析的过程中并没有明确的先后顺序,作为下次分析的主题