k8s device plugin

危飞跃
2023-12-01

基本概念入门:

Device Manager Proposal

Device plugin offical Doc(中文)

device-plugins offical Doc(En)

 

Go through Intel FPGA Plugin code

1.  cmd/fpga_plugin/fpga_plugin.go

生成一个新的puglin, pulgin传入的信息sysfs,devfs和mode(共两种mode: af or region

plugin, err := newDevicePlugin(sysfsDirectory, devfsDirectory, mode)
if err != nil {
fatal(err)
}

fmt.Println("FPGA device plugin started in ", mode, " mode")

manager := dpapi.NewManager(namespace, plugin)
manager.Run()

2.  internal/deviceplugin/manager.go

会生成一个server, 然后run, 主要就是devicePlugin.Scan (具体到某个device),扫描设备信息,然后启动grpc Serve(handleUpdate)

// Manager manages life cycle of device plugins and handles the scan results
// received from them.
type Manager struct {
        devicePlugin Scanner
        namespace    string
        servers      map[string]devicePluginServer
        createServer func(string, func(*pluginapi.AllocateResponse) error) devicePluginServer
}

// NewManager creates a new instance of Manager
func NewManager(namespace string, devicePlugin Scanner) *Manager {
        return &Manager{
                devicePlugin: devicePlugin,
                namespace:    namespace,
                servers:      make(map[string]devicePluginServer),
                createServer: newServer,
        }
}

// Run prepares and launches event loop for updates from Scanner
func (m *Manager) Run() {
        updatesCh := make(chan updateInfo)

        go func() {
                err := m.devicePlugin.Scan(newNotifier(updatesCh))
                if err != nil {
                        fmt.Printf("Device scan failed: %+v\n", err)
                        os.Exit(1)
                }
                close(updatesCh)
        }()

        for update := range updatesCh {
                m.handleUpdate(update)
        }
}

handleUpdate 启动grpc 服务 m.servers[dt].Serve(m.namespace)

func (m *Manager) handleUpdate(update updateInfo) {
        debug.Print("Received dev updates:", update)
        for devType, devices := range update.Added {
                var postAllocate func(*pluginapi.AllocateResponse) error

                if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
                        postAllocate = postAllocator.PostAllocate
                }

                m.servers[devType] = m.createServer(devType, postAllocate)
                go func(dt string) {
                        err := m.servers[dt].Serve(m.namespace)
                        if err != nil {
                                fmt.Printf("Failed to serve %s/%s: %+v\n", m.namespace, dt, err)
                                os.Exit(1)
                        }
                }(devType)
                m.servers[devType].Update(devices)
        }
        for devType, devices := range update.Updated {
                m.servers[devType].Update(devices)
        }
        for devType := range update.Removed {
                m.servers[devType].Stop()
                delete(m.servers, devType)
        }
}

 

3. cmd/fpga_plugin/fpga_plugin.go

获得Device的具体信息

// Scan starts scanning FPGA devices on the host
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
        for {
                devTree, err := dp.scanFPGAs()
                if err != nil {
                        return err
                }

                notifier.Notify(devTree)

                time.Sleep(5 * time.Second)
        }
}

4. 启动GRPC 服务

// Serve starts a gRPC server to serve pluginapi.PluginInterfaceServer interface.
func (srv *server) Serve(namespace string) error {
        return srv.setupAndServe(namespace, pluginapi.DevicePluginPath, pluginapi.KubeletSocket)
}

 

// setupAndServe binds given gRPC server to device manager, starts it and registers it with kubelet.
func (srv *server) setupAndServe(namespace string, devicePluginPath string, kubeletSocket string) error {
        resourceName := namespace + "/" + srv.devType
        pluginPrefix := namespace + "-" + srv.devType

        for {
                pluginEndpoint := pluginPrefix + ".sock"
                pluginSocket := path.Join(devicePluginPath, pluginEndpoint)

                if err := waitForServer(pluginSocket, time.Second); err == nil {
                        return errors.Errorf("Socket %s is already in use", pluginSocket)
                }
                os.Remove(pluginSocket)

                lis, err := net.Listen("unix", pluginSocket)
                if err != nil {
                        return errors.Wrap(err, "Failed to listen to plugin socket")
                }

                srv.grpcServer = grpc.NewServer()
                pluginapi.RegisterDevicePluginServer(srv.grpcServer, srv)

                // Starts device plugin service.
                go func() {
                        fmt.Printf("Start server for %s at: %s\n", srv.devType, pluginSocket)
                        srv.grpcServer.Serve(lis)
                }()

                // Wait for the server to start
                if err = waitForServer(pluginSocket, 10*time.Second); err != nil {
                        return err
                }

                // Register with Kubelet.
                err = registerWithKubelet(kubeletSocket, pluginEndpoint, resourceName)
                if err != nil {
                        return err
                }
                fmt.Printf("Device plugin for %s registered\n", srv.devType)

                // Kubelet removes plugin socket when it (re)starts
                // plugin must restart in this case
                if err = watchFile(pluginSocket); err != nil {
                        return err
                }
                fmt.Printf("Socket %s removed, restarting\n", pluginSocket)

                srv.grpcServer.Stop()
                os.Remove(pluginSocket)
        }
}

  

5. 注册GRPC server

vendor/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go

func RegisterRegistrationServer(s *grpc.Server, srv RegistrationServer) {
        s.RegisterService(&_Registration_serviceDesc, srv)
}

"vendor/google.golang.org/grpc/server.go"

// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
        ht := reflect.TypeOf(sd.HandlerType).Elem()
        st := reflect.TypeOf(ss)
        if !st.Implements(ht) {
                grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
        }
        s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
        s.mu.Lock()
        defer s.mu.Unlock()
        s.printf("RegisterService(%q)", sd.ServiceName)
        if s.serve {
                grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
        }
        if _, ok := s.m[sd.ServiceName]; ok {
                grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
        }
        srv := &service{
                server: ss,
                md:     make(map[string]*MethodDesc),
                sd:     make(map[string]*StreamDesc),
                mdata:  sd.Metadata,
        }
        for i := range sd.Methods {
                d := &sd.Methods[i]
                srv.md[d.MethodName] = d
        }
        for i := range sd.Streams {
                d := &sd.Streams[i]
                srv.sd[d.StreamName] = d
        }
        s.m[sd.ServiceName] = srv
}

(s *Server) Serve

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
	s.mu.Lock()
	s.printf("serving")
	s.serve = true
	if s.lis == nil {
		// Serve called after Stop or GracefulStop.
		s.mu.Unlock()
		lis.Close()
		return ErrServerStopped
	}

	s.serveWG.Add(1)
	defer func() {
		s.serveWG.Done()
		select {
		// Stop or GracefulStop called; block until done and return nil.
		case <-s.quit:
			<-s.done
		default:
		}
	}()

	ls := &listenSocket{Listener: lis}
	s.lis[ls] = true

	if channelz.IsOn() {
		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
	}
	s.mu.Unlock()

	defer func() {
		s.mu.Lock()
		if s.lis != nil && s.lis[ls] {
			ls.Close()
			delete(s.lis, ls)
		}
		s.mu.Unlock()
	}()

	var tempDelay time.Duration // how long to sleep on accept failure

	for {
		rawConn, err := lis.Accept()
		if err != nil {
			if ne, ok := err.(interface {
				Temporary() bool
			}); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				s.mu.Lock()
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
				s.mu.Unlock()
				timer := time.NewTimer(tempDelay)
				select {
				case <-timer.C:
				case <-s.quit:
					timer.Stop()
					return nil
				}
				continue
			}
			s.mu.Lock()
			s.printf("done serving; Accept = %v", err)
			s.mu.Unlock()

			select {
			case <-s.quit:
				return nil
			default:
			}
			return err
		}
		tempDelay = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		s.serveWG.Add(1)
		go func() {
			s.handleRawConn(rawConn)
			s.serveWG.Done()
		}()
	}
}

 

gRPC tutorial

6.  注册kebelet

func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
        conn, err := grpc.Dial(kubeletSocket, grpc.WithInsecure(),
                grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
                        return net.DialTimeout("unix", addr, timeout)
                }))
        if err != nil {
                return errors.Wrap(err, "Cannot connect to kubelet service")
        }
        defer conn.Close()
        client := pluginapi.NewRegistrationClient(conn)
        reqt := &pluginapi.RegisterRequest{
                Version:      pluginapi.Version,
                Endpoint:     pluginEndPoint,
                ResourceName: resourceName,
        }

        _, err = client.Register(context.Background(), reqt)
        if err != nil {
                return errors.Wrap(err, "Cannot register to kubelet service")
        }

        return nil
}

 

7. 定义  DevicePluginServer interface   

"vendor/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go"  

/ Server API for DevicePlugin service

type DevicePluginServer interface {
        // GetDevicePluginOptions returns options to be communicated with Device
        // Manager
        GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
        // ListAndWatch returns a stream of List of Devices
        // Whenever a Device state change or a Device disapears, ListAndWatch
        // returns the new list
        ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
        // Allocate is called during container creation so that the Device
        // Plugin can run device specific operations and instruct Kubelet
        // of the steps to make the Device available in the container
        Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
        // PreStartContainer is called, if indicated by Device Plugin during registeration phase,
        // before each container start. Device plugin can run device specific operations
        // such as reseting the device before making devices available to the container
        PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}

具体实现

"internal/deviceplugin/server.go"  

 

参考===============================

prepare

Kubernetes的Device Plugin设计解读

深入浅出kubernetes之device-plugins

 kubernetes调度gpu

 KubeVirt:通过CRD扩展Kubernetes实现虚拟机管理

kubernetes系列之十四:Kubernetes CRD(CustomResourceDefinition)概览

 

Extend the Kubernetes API with CustomResourceDefinitions 

用户资源定义(基本上所有的项目都用到了这个)

example

 Kubernetes CRD (CustomResourceDefinition) 自定义资源类型

 

REF:

k8s 基本概念

k8s 系列介绍

API Extensions

Schedule GPUs 

 中文分析

KUBERNETES ON NVIDIA GPUS

RDMA device plugin for Kubernetes

intel-device-plugins-for-kubernetes

 

  

概念:

1. Opaque Integer Resources (OIRs) 

Scheduling • Opaque Integer Resources (OIRs) ⽬目前已棄⽤,也將在 v1.9 版本移除。 • Extended Resources (ERs) 成為 OIRs 的替代 Resource。 • 使⽤用者能夠使⽤用 kubernetes.io/ domain 之外的任何域名前輟,不再是使 ⽤用 pod.alpha.kubernetes.io/opaque-int-resource- prefix。

 

转载于:https://www.cnblogs.com/shaohef/p/9478309.html

 类似资料:

相关阅读

相关文章

相关问答