StorageEncodingOverridesKubernetes资源数据存储在ETCD中,存储的数据格式缺省为:application/json,版本使用__internal版本。
具体见:MergeGroupEncodingConfig函数,所以从获取资源的时候,我们是不需要指定版本的。
一般基于:schema.GroupResource类型去获取资源,如果要获取所有的资源,资源可以指定为“*”。
对于每个资源类型,我们可以指定参数schema.GroupResource来基于DefaultStorageFactory来获取对应资源的存储接口。
资源信息存储路径前缀缺省为:DefaultEtcdPathPrefix = "registry"
但是这个参数我们可以在运行时指定参数覆盖,具体的参数配置为:etcd-prefix
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
......
}
Etcd的属性如下:
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
return &EtcdOptions{
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true,
DefaultWatchCacheSize: 100,
}
}
DefaultStorageFactory的主要作用是基于GroupResource,返回对应的存储接口。
结果包括:
DefaultStorageFactory的构建方法为:
func NewDefaultStorageFactory(
config storagebackend.Config,
defaultMediaType string, // 从EtcdOptions参数中传入的,缺省为 application/json,见NewEtcdOptions方法
defaultSerializer runtime.StorageSerializer, // 具体的值:legacyscheme.Codecs
resourceEncodingConfig ResourceEncodingConfig, // 资源编码配置情况,并不是所有的资源都按照指定的Group来存放,有些特例。另外也可以指定存储在不同etcd、不同的prefix、甚至于不同的编码存储。
resourceConfig APIResourceConfigSource, // 启用的资源版本的API情况
specialDefaultResourcePrefixes map[schema.GroupResource]string, // 见:SpecialDefaultResourcePrefixes
) *DefaultStorageFactory {
config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
return &DefaultStorageFactory{
StorageConfig: config, // 描述了如何创建到底层存储的连接,包含了各种存储接口storage.Interface实现的认证信息。
Overrides: map[schema.GroupResource]groupResourceOverrides{}, // 特殊资源处理
DefaultMediaType: defaultMediaType, // 缺省存储媒介类型,application/json
DefaultSerializer: defaultSerializer, // 缺省序列化实例,legacyscheme.Codecs
ResourceEncodingConfig: resourceEncodingConfig, // 资源编码配置
APIResourceConfigSource: resourceConfig, // API启用的资源版本
DefaultResourcePrefixes: specialDefaultResourcePrefixes, // 特殊资源prefix
newStorageCodecFn: NewStorageCodec, // 为提供的存储媒介类型、序列化和请求的存储与内存版本组装一个存储codec
}
}
newStroageCodecFn:用于基于请求的存储和内存的GroupVersion,以及存储的媒介、序列化实例,生成对应的runtime.Codec。看起来是为了在存储与内存数据之间进行转换的编解码期 。
前面提到了存储序列化,我们在启动apiserver的时候,可以指定--storage-versions来指定存储资源版本,最终参数数据会存放到StorageSerializationOptions中的StorageVersions字段。
StorageSerializationOptions包含了资源编码的属性,其中的DefaultStorageVersions属性没有暴漏参数,所以只能使用缺省值。如下所示,AllPreferredGroupVersions返回注册组的首选版本信息,表现形式为:group1/version1, group2/version2。
最终StorageVersions的数据会覆盖DefaultStorageVersions。
// StorageSerializationOptions contains the options for encoding resources.
type StorageSerializationOptions struct {
StorageVersions string
// The default values for StorageVersions. StorageVersions overrides
// these; you can change this if you want to change the defaults (e.g.,
// for testing). This is not actually exposed as a flag.
// 缺省的存储资源版本,StorageVersions将会覆盖自己,DefaultStorageVersion不会对外暴漏启动参数
// 这样缺省版本的数据是肯定有的,如下面的NewStorageSerializationOptions方法中,它的值为:legacyscheme.Registry.AllPreferredGroupVersions()
// 所以我们在处理存储版本的时候,缺省的版本里面的数据必须有,但是可以通过参数--storage-version来覆盖部分或者全部。
DefaultStorageVersions string
}
func NewStorageSerializationOptions() *StorageSerializationOptions {
return &StorageSerializationOptions{
DefaultStorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
StorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
}
}
--storage-versions 参数说明:
The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. (default "admission.k8s.io/v1beta1,admissionregistration.k8s.io/v1beta1,apps/v1,authentication.k8s.io/v1,authorization.k8s.io/v1,autoscaling/v1,batch/v1,certificates.k8s.io/v1beta1,componentconfig/v1alpha1,events.k8s.io/v1beta1,extensions/v1beta1,imagepolicy.k8s.io/v1alpha1,networking.k8s.io/v1,policy/v1beta1,rbac.authorization.k8s.io/v1,scheduling.k8s.io/v1alpha1,settings.k8s.io/v1alpha1,storage.k8s.io/v1,v1")
该方法构建存储工厂,关键代码部分是调用NewDefaultStorageFactory生成DefaultStorageFactory实例。
func BuildStorageFactory(s *options.ServerRunOptions, apiResourceConfig *serverstorage.ResourceConfig) (*serverstorage.DefaultStorageFactory, error) {
// 获取group-> GroupVersion Map
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
if err != nil {
return nil, fmt.Errorf("error generating storage version map: %s", err)
}
// 构建了存储工厂实例,使用了DefaultStorageFactory类型,归并了缺省资源编码(defaultResourceEncoding)与用户指定的数据
storageFactory, err := kubeapiserver.NewStorageFactory(
s.Etcd.StorageConfig, // Etcd的配置
s.Etcd.DefaultStorageMediaType, // 缺省存储媒介类型:application/json
legacyscheme.Codecs, // 传统的编解码
serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Registry),
storageGroupsToEncodingVersion, // 前面创建的Group-> GroupVersion的映射
// The list includes resources that need to be stored in a different
// group version than other resources in the groups.
// FIXME (soltysh): this GroupVersionResource override should be configurable
// 下面列表列举了与组内其他资源不同,需要存储到不同的group version的资源
[]schema.GroupVersionResource{
batch.Resource("cronjobs").WithVersion("v1beta1"),
storage.Resource("volumeattachments").WithVersion("v1beta1"),
admissionregistration.Resource("initializerconfigurations").WithVersion("v1alpha1"),
},
apiResourceConfig) // 描述了启动的API信息,基于启动参数中的APIEnablementOptions生成,存放到了genericapiserver.Config的MergedResourceConfig字段中。
// 该字段合并了DefaultAPIResourceConfigSource()方法定义的资源。
if err != nil {
return nil, fmt.Errorf("error in initializing storage factory: %s", err)
}
// 同居资源绑定,约定了同居资源的查找顺序
storageFactory.AddCohabitatingResources(networking.Resource("networkpolicies"), extensions.Resource("networkpolicies"))
storageFactory.AddCohabitatingResources(apps.Resource("deployments"), extensions.Resource("deployments"))
storageFactory.AddCohabitatingResources(apps.Resource("daemonsets"), extensions.Resource("daemonsets"))
storageFactory.AddCohabitatingResources(apps.Resource("replicasets"), extensions.Resource("replicasets"))
storageFactory.AddCohabitatingResources(api.Resource("events"), events.Resource("events"))
for _, override := range s.Etcd.EtcdServersOverrides { // EtcdServersOverrides的格式:group/resource#servers,可以指定不同资源存储在不同的Etcd服务中。
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
continue
}
group := apiresource[0]
resource := apiresource[1]
groupResource := schema.GroupResource{Group: group, Resource: resource}
servers := strings.Split(tokens[1], ";")
storageFactory.SetEtcdLocation(groupResource, servers) // 为指定的服务,设置特定的Etcd服务
}
if len(s.Etcd.EncryptionProviderConfigFilepath) != 0 {
transformerOverrides, err := encryptionconfig.GetTransformerOverrides(s.Etcd.EncryptionProviderConfigFilepath)
if err != nil {
return nil, err
}
for groupResource, transformer := range transformerOverrides {
storageFactory.SetTransformer(groupResource, transformer)
}
}
return storageFactory, nil
}
最终构建好的DefaultStorageFactory,会被存储在genericapiserver.Config的RESTOptionsGetter成员中,如下代码所示:
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
s.addEtcdHealthEndpoint(c) // 添加基本的Etcd健康检查
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
这里我们以PodStorage为例(k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go),来分析一下pods以及相关子资源的存储是如何实现的。PodStorage的定义为:
type PodStorage struct {
Pod *REST
Binding *BindingREST
Eviction *EvictionREST
Status *StatusREST
Log *podrest.LogREST
Proxy *podrest.ProxyREST
Exec *podrest.ExecREST
Attach *podrest.AttachREST
PortForward *podrest.PortForwardREST
}
可以看到PodStorage包含了Pod以及所有的相关的子资源的存储,上述的每个成员负责一种资源的存储服务,前面我们已经提到过,存储是放在ETCD的,下面我们先看看PodStorage实例的构建。
PodStorage实例是在安装传统核心数据资源的Rest API的过程中被创建的,代码在k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go中的LegacyRESTStorageProvider.NewLegacyRESTStorage方法,在该方法中会创建各种资源数据存储实例,其中PodStorage的实例创建代码为:
podStorage := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
这里的restOptionsGetter也就是为构建GenericAPIServer创建的k8s.io/apiserver/pkg/server/config.go中的Config结构中的RESTOptionsGetter成员,前面我们已经分析过,基于ETCD配置构建Storage工厂之后,最终的工厂实例赋予RESTOptionsGetter成员了。下面我们来看看NewStorage的方法的代码:
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} }, // NewFunc用于构建一个Pod实例
NewListFunc: func() runtime.Object { return &api.PodList{} }, // NewListFunc用于构建一个PodList实例
PredicateFunc: pod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
CreateStrategy: pod.Strategy, // 创建、更新Pod时执行的缺省逻辑,具体的类型为podStrategy
UpdateStrategy: pod.Strategy,
DeleteStrategy: pod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}
}
上面代码的关键,就是store对象的创建,store.Storage的类型为:storage.Interface接口( k8s.io/apiserver/pkg/storage/interfaces.go)。store.Storage后面我们在REST API的分析中会用到。
NewFunc负责创建一个Pod实例,在API协议的支持中会用到它去创建一个Pod实例,我们在看store.CompleteWithOptions(options)中,实现了一些其它成员的填充,主要是Storage、DestroyFunc:
e.Storage, e.DestroyFunc = opts.Decorator(
opts.StorageConfig,
e.NewFunc(),
prefix,
keyFunc,
e.NewListFunc,
attrFunc,
triggerFunc,
)
可以看到Storage成员用到了前面说到的NewFunc和NewListFunc。在API Server的启动流程中,我们知道在构建genericserver.Config对象时,调用了EtcdOptions.ApplyWithStorageFactoryTo方法时,赋值了RESTOptionsGetter这个成员。
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
s.addEtcdHealthEndpoint(c)
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
所以genericserver.Config.RESTOptionsGetter的实例类型是:storageFactoryRestOptionsFactory,下面是他的GetRESTOptions方法:
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
在这返回了一个RESTOptions对象,是调用genericregistry.StorageWithCacher(cacheSize)构建出来的generic.StorageDecorator实例,其实它是一个函数,用来返回Storage以及DestroyFunc方法,如下所示:
// Creates a cacher based given storageConfig.
func StorageWithCacher(capacity int) generic.StorageDecorator {
return func(
storageConfig *storagebackend.Config,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
// 创建一个裸的ETCD存储接口实例
s, d := generic.NewRawStorage(storageConfig)
if capacity == 0 {
glog.V(5).Infof("Storage caching is disabled for %T", objectType)
return s, d
}
glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
// 基于裸ETCD存储实例创建带缓存能力的存储
cacher := storage.NewCacherFromConfig(cacherConfig)
// destroyFunc是用于释放存储本身资源的。
destroyFunc := func() {
cacher.Stop()
d()
}
// TODO : Remove RegisterStorageCleanup below when PR
// https://github.com/kubernetes/kubernetes/pull/50690
// merges as that shuts down storage properly
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc
}
}
事情慢慢接近事情的本质了,最终的存储实例的生成是调用了generic.RESTOptions.Decorator( 实际值generic.UndecoratedStorage) 方法, 也就是上述代码返回的方法,在上述方法中,首先创建了一个裸的ETCD存储,然后在上面封装了一个Cache存储,下面我们在看看创建裸ETCD存储的代码(以etcd3 存储类型为例):
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
if err != nil {
glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s, d
}
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, nil, err
}
// NOTE: Client relies on nil tlsConfig
// for non-secure connections, update the implicit variable
if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
tlsConfig = nil
}
cfg := clientv3.Config{
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: c.ServerList,
TLS: tlsConfig,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(context.Background())
etcd3.StartCompactor(ctx, client, c.CompactionInterval)
destroyFunc := func() {
cancel()
client.Close()
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
}
if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, true, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := etcd.APIObjectVersioner{} // 实现了版本化
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
}
if !quorumRead {
// In case of non-quorum reads, we can set WithSerializable()
// options for all Get operations.
result.getOps = append(result.getOps, clientv3.WithSerializable())
}
return result
}
上述store以及cache对象都实现storage.Interface接口,对外提供统一的Pod数据资源的存储服务。
现在我们在回到PodStorage上来,我们看看它的成员你的定义:
// REST自动集成了genericregistry.Store的方法
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
type BindingREST struct {
store *genericregistry.Store
}
......
所以构建了genericregistry.Store实例,就完成Pod资源以及其子资源存储对象的关键,那么整个PodStorage的构建也就完成了。
在API Install源码分析中,我们解释了最终完成从rest.Storage到http.Route的转换的。具体细节这里不再说明,主要的原理,就是看对应的Storage对象实现了资源数据对象的什么接口,实现了对应的接口,最终就会生成相应的REST API。
依然以PodStorage为例,来进行说明,PodStorage包含了各种成员,它们负责的接口关系如下:
PodStorage Rest 存储对象 | 对应API Rest框架的接口 | 接口的功能 |
---|---|---|
REST | rest.Redirector、rest.CreaterUpdate、rest.Lister、rest.Watcher、rest.GracefulDeleter、rest.Getter | 重定向资源的路径、资源创建更新接口、资源列表查询接口、Watcher资源变化接口、支持延迟的资源删除接口、获取具体资源的信息接口 |
BindngREST | rest.Creater | 创建资源的接口 |
StatusREST | rest.Updater | 更新资源的接口 |
LogREST | rest.Updater | 获取资源的接口 |
ExecREST\ProxyREST\PortForwardREST | rest.Connecter | 连接资源的接口 |
······
可以看到PodStorage.REST实现了不少REST功能,它是怎么实现的呢?这里再次看看它的定义:
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
可以看到REST直接继承了genericregistry.Store的所有函数,这里我们以Creater为例来介绍它是如何实现一个Pod的创建的,Creater的接口定义如下:
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Create creates a new version of a resource. If includeUninitialized is set, the object may be returned
// without completing initialization.
Create(ctx genericapirequest.Context, obj runtime.Object, createValidation ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error)
}
而genericregistry.Store实现了相关的方法,
主要分为几个步骤:
// Create inserts a new item according to the unique key from the object.
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
// BeforeCreate把creation之前的通用操作完成。会调用PrepareForCreate,GenerateName,Validate
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
if err := createValidation(obj.DeepCopyObject()); err != nil {
return nil, err
}
}
// 获取对象的名字
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
// 获取KEY
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
// 生成一个空的对象,而对于PodStorage的store的创建中,我们已经知道,它实际是生成一个Pod对象
out := e.NewFunc()
// 这里用到Storage,也分析过,最终会调用Etcd,并且在这里会调用runtime.Codec完成从对象到字符串的转换,最终保存到etcd中。
if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
if !kubeerr.IsAlreadyExists(err) {
return nil, err
}
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
return nil, err
}
accessor, errGetAcc := meta.Accessor(out)
if errGetAcc != nil {
return nil, err
}
if accessor.GetDeletionTimestamp() != nil {
msg := &err.(*kubeerr.StatusError).ErrStatus.Message
*msg = fmt.Sprintf("object is being deleted: %s", *msg)
}
return nil, err
}
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, err
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
if !includeUninitialized {
return e.WaitForInitialized(ctx, out)
}
return out, nil
}
那具体写入etcd是如何完成的呢?前面我们列举了etcd3的store类型对象(实现了storage.Interface)的构建,这里看看基于etcd3如何把对象写入到etcd中,代码如下:
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
// version不能被设置
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
// 序列化,这里应该是编码为json字符串
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
// 找到etcd路径
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
// 按需做存储之前的数据转换
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
// 基于etcd3的api,写入数据
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
前面我们看到了如何对外提供API服务,并进行数据的最终的存储处理操作。那么存储数据时的编解码是如何进行的呢,这里专门划出一章来分析。
首先,我们看一下Etcd的启动参数,缺省如下所示:
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
return &EtcdOptions{
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json", // 缺省的存储没接了类型
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true, // 缺省是带了WatchCache的,要取消,必须设置参数为--watch-cache=false 来取消
DefaultWatchCacheSize: 100,
}
}
从上面可以看出,缺省的存储媒介类型为"applicatoin/json",并且缺省是启用了watch-cache功能的,下面还是以PodStorage的初始化为例来进行分析,看如何对Pod的存储进行编解码的。
在前面的分析中,我们也知道了PodStorage初始时,会构建一个store成员,store成员中genericserver.Config.RESTOptionsGetter实际存储的对象类型为storageFactoryRestOptionsFactory,最终,我们在store.CompleteWithOptions的调用中,创建真正的存储实例:
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
......
// 在PodStorage的初始化中,DefaultQualifiedResource为api.Resource("pods")
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
......
if e.Storage == nil {
e.Storage, e.DestroyFunc = opts.Decorator(
opts.StorageConfig,
e.NewFunc(),
prefix,
keyFunc,
e.NewListFunc,
attrFunc,
triggerFunc,
)
}
return nil
}
所以从上面的代码可以看出,我们主要通过两个步骤来生成存储对象接口。
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
// 这里是编解码的关键,编解码的初始化在这里,这里的StorageFactory的实例对象为DefaultStorageFactory
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage, // 这里将直接返回Raw Etcd Storage
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
// 如果启动了Watch Cache,注意只是Watch Cache ......
// 则Decorator会被初始化为genericregistry.StorageWithCahce...
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
要了解具体的编解码还得继续研究DefaultStorageFactory.NewConfig的代码:
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
// 查看是否有共生的资源,如果没有就返回自己
chosenStorageResource := s.getStorageGroupResource(groupResource)
// operate on copy
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType, /
StorageSerializer: s.DefaultSerializer, // 这里传入的实际值为legacyscheme.Codecs
}
if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}
// ResourceEncodingConfig相关内容。
// 分为缺省的,以及Override的逻辑,
// 资源变量中包含Internel、Externel两种,Internel是内存数据对应的GroupVersion,Externel则是底层存储的GroupVersion
// 如果找不到的情况下, 一般我们会给内存数据这个缺省定义版本:APIVersionInternal = "__internal"
// TODO:这块打算下面章节专门讲解
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
codecConfig.Config = storageConfig
// 这里的newStorageCodecFn为 storage.NewStorageCodec,见NewDefaultStorageFactory
storageConfig.Codec, err = s.newStorageCodecFn(codecConfig)
if err != nil {
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
return &storageConfig, nil
}
从函数代码中看来,有两块我们需要去弄清楚:ResourceEncodingConfig和Codec的生成逻辑,ResourceEncodingConfig我们放到下一个章节进行说明,这里继续分析NewStorageCodec来看看,是如何生成Codec对象的。见:k8s.io/apiserver/pkg/server/storage/storage_codec.go
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions.
func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType) // 这里一般为: application/json的话,返回的mediaType 也是application/json
if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
}
// ETCD2 只支持 application/json
if opts.Config.Type == storagebackend.StorageTypeETCD2 && mediaType != "application/json" {
glog.Warningf(`storage type %q does not support media type %q, using "application/json"`, storagebackend.StorageTypeETCD2, mediaType)
mediaType = "application/json"
}
// 寻找对应的媒介类型的序列化实例,如果找不到,系统会报错
// 注意:在前面分析DefaultStorageFactory.NewConfig中,知道opts.StorageSerializer = DefaultStorageFactory.DefaultSerializer = legacyscheme.Codecs
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok { // 如果找不到,系统会报错
return nil, fmt.Errorf("unable to find serializer for %q", mediaType)
}
// 知道对应的媒介类型的序列化器,具体可以参见一下序列化工厂部分文档
s := serializer.Serializer
// etcd2不支持二进制格式,只支持文本格式。
// make sure the selected encoder supports string data
if !serializer.EncodesAsText && opts.Config.Type == storagebackend.StorageTypeETCD2 {
return nil, fmt.Errorf("storage type %q does not support binary media type %q", storagebackend.StorageTypeETCD2, mediaType)
}
// serializer实现了Encoder,Decoder接口
// Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will
// be passed to the recognizer so that multiple decoders are available.
var encoder runtime.Encoder = s
if opts.EncoderDecoratorFn != nil { // Encoder封装函数
encoder = opts.EncoderDecoratorFn(encoder)
}
decoders := []runtime.Decoder{
// selected decoder as the primary
s,
// universal deserializer as a fallback
opts.StorageSerializer.UniversalDeserializer(), // s解析不了的,采用万能解析器
// base64-wrapped universal deserializer as a last resort.
// this allows reading base64-encoded protobuf, which should only exist if etcd2+protobuf was used at some point.
// data written that way could exist in etcd2, or could have been migrated to etcd3.
// TODO: flag this type of data if we encounter it, require migration (read to decode, write to persist using a supported encoder), and remove in 1.8
runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()), // 最后尝试Base64解析器
}
if opts.DecoderDecoratorFn != nil { // Decoder封装函数
decoders = opts.DecoderDecoratorFn(decoders)
}
// 注意之类的opts.StorageSerializer = legacyscheme.Codecs,实际结构为CodecFactory
// Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受StorageVersion.Group和MemoryVersion.Group,返回对应StorageVersion的GVK。
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
),
)
decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...),
runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受MemoryVersion.Group和StorageVersion.Group,返回对应MemoryVersion的GVK。
opts.MemoryVersion,
schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: opts.StorageVersion.Group},
),
)
return runtime.NewCodec(encoder, decoder), nil
}
上述代码生成了Codec对象,代码逻辑比较清晰,主要逻辑是基于StorageFactory中存户的Serializer进行封装,封装出来的Encoder和Decoder能够支持版本化,最终生成统一的Codec实例。
有两个地方需要说明一下。
第一点,GroupVersioner负责提炼一系列可供转换的GVK目标,并把他们转换成统一的GVK,在上述代码中,encoder的构建过程中,通过runtime.NewMultiGroupVersioner生成了一个multiGroupVersioner对象,在初始化过程中,我们可以看到它的目标为StorageVersion,而接受的输入Group有StorageVersion.Group与MemoryVersion.Group,在它的实现方法中,可以支持接受StorageVersion.Group和MemoryVersion.Group的数据,并最终转换StorageVersion对应的Group与Version,同时加上输入对象本身的Kind,从而得到最终的GVK,如下所示:
func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (schema.GroupVersionKind, bool) {
for _, src := range kinds {
for _, kind := range v.acceptedGroupKinds {
if kind.Group != src.Group {
continue
}
if len(kind.Kind) > 0 && kind.Kind != src.Kind {
continue
}
return v.target.WithKind(src.Kind), true // 这里基于目标GV生成GVK
}
}
return schema.GroupVersionKind{}, false
}
decoder部分的GroupVersioner的对象也是与encoder部分一样,只是目标GV反过来了。
第二点,分析一下EncoderForVersion的函数调用,该函数负责输入一个Encoder和GroupVersioner,返回一个encoder,并且该encoder能够保证写到指定的序列化器的对象是指定的GV。(注意断句:该对象是指定的GV,该对象会被写入到指定的serailizer中)。
type StorageSerializer interface {
// SupportedMediaTypes are the media types supported for reading and writing objects.
SupportedMediaTypes() []SerializerInfo
// UniversalDeserializer returns a Serializer that can read objects in multiple supported formats
// by introspecting the data at rest.
UniversalDeserializer() Decoder
// 返回一个encoder,该encoder能够保证写入底层序列化器的对象是指定的GV
// EncoderForVersion returns an encoder that ensures objects being written to the provided
// serializer are in the provided group version.
EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
// 返回一个decoder,该decoder能够保证被底层序列化器的反序列化的对象是指定的GV
// DecoderForVersion returns a decoder that ensures objects being read by the provided
// serializer are in the provided group version by default.
DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
}
同时,前面我们多次分析过,这里opts.StorageSerializer的值为legacyscheme.Codecs。
legacyscheme.Codecs的实例构建,var Codecs = serializer.NewCodecFactory(Scheme),它是一个CodecFactory实例,见API Server编解码,CodecFactory实例缺省会支持json/yaml/protobuf三种编解码。
下面我们再次回顾一下PodStorage对外提供REST服务的过程,在提供Create REST服务时,最终调用了storage.Interface.Create方法,storage.Interface实际对应于etcd3.store,在它的Create方法中,最终,调用了runtime.Encode来完成编解码工作。
从整个etcd3.store实例的创建过程来看,这里的s.codec成员是来自于storagebackend.Config.Codec。
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
......
// 序列化,这里应该是编码为json字符串
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
......
return nil
}
Codec的生成我们前面也进行过分析,主要是在DefaultStorageFactory.NewConfig,具体的分析见上面,下面我们摘取一部分代码来分析,Codec的encoder和decoder成员如下所示:
// 注意之类的opts.StorageSerializer = legacyscheme.Codecs,实际结构为CodecFactory
// Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受StorageVersion.Group和MemoryVersion.Group,返回对应StorageVersion的GVK。
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
),
)
decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...),
runtime.NewMultiGroupVersioner( // 构建一个GroupVersioner,该GroupVersioner接受MemoryVersion.Group和StorageVersion.Group,返回对应MemoryVersion的GVK。
opts.MemoryVersion,
schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: opts.StorageVersion.Group},
),
)
所以,我们在往Etcd中存储写入时,目标Group和版本为opts.StorageVersion,而从Etcd中读取数据时则为opts.MemoryVersion,现在我们来看看,这两个究竟分别是什么(还是以Pod为例)。
这里我们先给一下答案在往下分析:组版本的注册和启用信息是存储在Registry这样一个全局变量中,它的类型是APIRegistrationManager,它有一个成员groupMetaMap类型为map[string]*apimachinery.GroupMeta,用来存储不同组的Metadata信息。而对应组的信息是存储在GroupMeta对象中。这里我们主要是分析Pod,那么它是属于核心组(core),一般来说它的组名是“”,而版本是“v1”。所以SotargeVersion值为{group:"", verison:"v1"},而MemoryVersion是什么呢,通过InMemoryEncodingFor的方法,我们可以看到,缺省情况下,它的版本是"__internal",所以这里值为:{group:"", version:"__internal"}。
代码也是在DefaultStarageFactory.NewConfig函数中,如下所示:
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
ResourceEncodingConfig的内容是什么,前面我们做过基本的分析,但是不够透彻,这里作为一个章节单独进行详细的分析。ResourceEncodingConfig的内容应该是由三部分组成的:
下面将对这三部分分别进行说明,然后再研究,他们怎么组合起来的。
代码见k8s.io/kubernetes/cmd/kube-apiserver/app/server.go中的BuildStorageFactory方法,中,调用了NewStorageFactory方法时,传入的defaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig参数中赋值为:
serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Registry)
func NewDefaultResourceEncodingConfig(registry *registered.APIRegistrationManager) *DefaultResourceEncodingConfig {
return &DefaultResourceEncodingConfig{groups: map[string]*GroupResourceEncodingConfig{}, registry: registry}
}
从这里可以看出,DefaultResourceEncodingConfig中,传入的groups成员是个空的map数据,那么这里的缺省的资源编码配置又是如何获得的呢?
答案:从registry中找出对应GroupVersion的GroupMeta(组元数据),GroupMeta.GroupVersion成员。
这里我们就需要先来看一下前面我们会用到的两个方法:StorageEncodingFor(chosenStorageResource)和InMemoryEncodingFor(chosenStorageResource)的定义。
func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
// 从registry中找到对应的组的元数据
groupMeta, err := o.registry.Group(resource.Group)
if err != nil {
return schema.GroupVersion{}, err
}
// 从groups成员看是否有该组的专有编码
groupEncoding, groupExists := o.groups[resource.Group]
// 如果没有专有编码说明,则直接使用组元数据中的GroupVersion值
if !groupExists {
// return the most preferred external version for the group
return groupMeta.GroupVersion, nil
}
// 否则查询该组资源的 Externel 资源编码
resourceOverride, resourceExists := groupEncoding.ExternalResourceEncodings[resource.Resource]
if !resourceExists {
return groupEncoding.DefaultExternalEncoding, nil // 如果找不到,则直接返回该组编码的缺省外部编码
}
return resourceOverride, nil // 返回对应组资源的外部编码
}
InMemoryEncodingFor函数与StorageEncodingFor基本上相同,除了是访问内部编码资源外,没什么区别,所以不列举代码了。
对于专有组与组资源编码的存储成员groups是如何赋值的,在后面两节中会专门说明,这里主要分析一下另外一个成员registry,registry在这里的实际的值为legacyscheme.Registry,下面我们来分析一下该对象,它的定义为:
var Registry = registered.NewOrDie(os.Getenv("KUBE_API_VERSIONS"))
Registry的类型为APIRegistrationManager,APIRegistrationManager提供了注册的Group Version以及启用的API groups的信息。
type APIRegistrationManager struct {
// registeredGroupVersions stores all API group versions for which RegisterGroup is called.
registeredVersions map[schema.GroupVersion]struct{} // 注册的group version
// enabledVersions represents all enabled API versions. It should be a
// subset of registeredVersions. Please call EnableVersions() to add
// enabled versions.
enabledVersions map[schema.GroupVersion]struct{} // 启用的group version , 是注册GV的子集
// map of group meta for all groups.
groupMetaMap map[string]*apimachinery.GroupMeta // 所有组的元数据信息
// envRequestedVersions represents the versions requested via the
// KUBE_API_VERSIONS environment variable. The install package of each group
// checks this list before add their versions to the latest package and
// Scheme. This list is small and order matters, so represent as a slice
envRequestedVersions []schema.GroupVersion
}
前面我们看到,legacyscheme.Registry是一个全局变量,那么究竟有注册了哪些版本和启用了哪些版本呢?这里,我们要了解Go语言的机制,func init()函数会在引用某个包的时候,自动被调用,经过仔细分析,可以发现在k8s.io/kubernetes/pkg/apis/core/install/install.go中,有一个这样的函数:
func init() {
Install(legacyscheme.GroupFactoryRegistry, legacyscheme.Registry, legacyscheme.Scheme)
}
注意,这里我们分析的是PodStorage,所以只看了core资源这块,对于API的数据资源有很多个Group,包括但不限于:Core、abac、apps、authentication、authorization、autoscaling、batch、componentconfig、extensions、policy、rbac、certifactes、networking,新的版本会不断的增加新的组。每个组都处于k8s.io/pkg/apis下的一个子目录中,每个字段都会有一段func init()函数,目前他们的内容都是一样,也都是向legacyscheme.Registry和legacyscheme.Scheme注册信息。
调用Install函数,Install函数负责注册API Group和把各种资源数据对象添加到Scheme中,所以在这个函数中同时完成legacyscheme.Registry和legacyscheme.Scheme两个变量。
// Install registers the API group and adds types to a scheme
func Install(groupFactoryRegistry announced.APIGroupFactoryRegistry, registry *registered.APIRegistrationManager, scheme *runtime.Scheme) {
if err := announced.NewGroupMetaFactory(
&announced.GroupMetaFactoryArgs{
GroupName: core.GroupName, // 这里为“”
VersionPreferenceOrder: []string{v1.SchemeGroupVersion.Version},
AddInternalObjectsToScheme: core.AddToScheme, // 添加内部对象的AddToScheme方法
RootScopedKinds: sets.NewString(
"Node",
"Namespace",
"PersistentVolume",
"ComponentStatus",
),
IgnoredKinds: sets.NewString(
"ListOptions",
"DeleteOptions",
"Status",
"PodLogOptions",
"PodExecOptions",
"PodAttachOptions",
"PodPortForwardOptions",
"PodProxyOptions",
"NodeProxyOptions",
"ServiceProxyOptions",
),
},
announced.VersionToSchemeFunc{
v1.SchemeGroupVersion.Version: v1.AddToScheme, // 这里描述了版本与AddToScheme方法的对应关系,在RegisterAndEnable过程中会被调用。
},
).Announce(groupFactoryRegistry).RegisterAndEnable(registry, scheme); err != nil {
panic(err)
}
}
函数中有三个参数其中groupFactoryRegistry这个参数暂时不知道有啥用处,上面的Announce方法调用,会往里面注册前面创建的GroupMetaFactory对象。
下面我们来分析RegisterAndEnable地方
// RegisterAndEnable is provided only to allow this code to get added in multiple steps.
// It's really bad that this is called in init() methods, but supporting this
// temporarily lets us do the change incrementally.
func (gmf *GroupMetaFactory) RegisterAndEnable(registry *registered.APIRegistrationManager, scheme *runtime.Scheme) error {
if err := gmf.Register(registry); err != nil { // 注册GV信息,只有注册以后的版本,才能Enable
return err
}
if err := gmf.Enable(registry, scheme); err != nil { // Enable版本,添加资源数据对象到Scheme中
return err
}
return nil
}
注册GV到registry中的方法如下:
// Register constructs the finalized prioritized version list and sanity checks
// the announced group & versions. Then it calls register.
func (gmf *GroupMetaFactory) Register(m *registered.APIRegistrationManager) error {
if gmf.GroupArgs == nil {
return fmt.Errorf("partially announced groups are not allowed, only got versions: %#v", gmf.VersionArgs)
}
if len(gmf.VersionArgs) == 0 {
return fmt.Errorf("group %v announced but no versions announced", gmf.GroupArgs.GroupName)
}
pvSet := sets.NewString(gmf.GroupArgs.VersionPreferenceOrder...)
if pvSet.Len() != len(gmf.GroupArgs.VersionPreferenceOrder) {
return fmt.Errorf("preference order for group %v has duplicates: %v", gmf.GroupArgs.GroupName, gmf.GroupArgs.VersionPreferenceOrder)
}
prioritizedVersions := []schema.GroupVersion{} // 版本信息
for _, v := range gmf.GroupArgs.VersionPreferenceOrder {
prioritizedVersions = append(
prioritizedVersions,
schema.GroupVersion{
Group: gmf.GroupArgs.GroupName, // 组,这里为“”
Version: v, // 这里为v1
},
)
}
// Go through versions that weren't explicitly prioritized.
unprioritizedVersions := []schema.GroupVersion{}
for _, v := range gmf.VersionArgs {
if v.GroupName != gmf.GroupArgs.GroupName {
return fmt.Errorf("found %v/%v in group %v?", v.GroupName, v.VersionName, gmf.GroupArgs.GroupName)
}
if pvSet.Has(v.VersionName) {
pvSet.Delete(v.VersionName)
continue
}
unprioritizedVersions = append(unprioritizedVersions, schema.GroupVersion{Group: v.GroupName, Version: v.VersionName})
}
if len(unprioritizedVersions) > 1 {
glog.Warningf("group %v has multiple unprioritized versions: %#v. They will have an arbitrary preference order!", gmf.GroupArgs.GroupName, unprioritizedVersions)
}
if pvSet.Len() != 0 {
return fmt.Errorf("group %v has versions in the priority list that were never announced: %s", gmf.GroupArgs.GroupName, pvSet)
}
prioritizedVersions = append(prioritizedVersions, unprioritizedVersions...)
m.RegisterVersions(prioritizedVersions) // 注册GV,这里其实就是 {group="", version="v1"}
gmf.prioritizedVersionList = prioritizedVersions
return nil
}
存储用到了legacyscheme域的Scheme和Registry,从上面的代码分析,这里只注册了{group="", version="v1"},下面来看Enable函数的代码。
func (gmf *GroupMetaFactory) Enable(m *registered.APIRegistrationManager, scheme *runtime.Scheme) error {
externalVersions := []schema.GroupVersion{}
for _, v := range gmf.prioritizedVersionList {
if !m.IsAllowedVersion(v) { // 可以在KUBE_API_VERSIONS中配置启用的版本,一般都是调试的时候用。
continue
}
externalVersions = append(externalVersions, v)
if err := m.EnableVersions(v); err != nil {
return err
}
gmf.VersionArgs[v.Version].AddToScheme(scheme) // 这里调用了AddToScheme方法,会把对应版本的资源数据对象添加到scheme中。
}
if len(externalVersions) == 0 {
glog.V(4).Infof("No version is registered for group %v", gmf.GroupArgs.GroupName)
return nil
}
if gmf.GroupArgs.AddInternalObjectsToScheme != nil {
gmf.GroupArgs.AddInternalObjectsToScheme(scheme) // 添加internel 对象到Scheme中。
}
preferredExternalVersion := externalVersions[0]
accessor := meta.NewAccessor()
groupMeta := &apimachinery.GroupMeta{
GroupVersion: preferredExternalVersion, // 对于核心组""来说,这里是 {"", v1}
GroupVersions: externalVersions, // 对于核心族来说这里是 []string{"v1"}
SelfLinker: runtime.SelfLinker(accessor),
}
for _, v := range externalVersions {
gvf := gmf.VersionArgs[v.Version]
if err := groupMeta.AddVersionInterfaces( // 这里为每个版本添加VersionInterface,VersionInterface包括两个部分:
// 1. ObjectConverter其实也就是scheme,负责不同版本资源数据转换
// 2. 而MetadataAccessor负责访问各种资源的基础信息,如Annotations,Name等信息
schema.GroupVersion{Group: gvf.GroupName, Version: gvf.VersionName},
&meta.VersionInterfaces{
ObjectConvertor: scheme,
MetadataAccessor: accessor,
},
); err != nil {
return err
}
}
groupMeta.InterfacesFor = groupMeta.DefaultInterfacesFor // 该方法主要根据group version来返回前面注册的各个版本的VersionInterface
groupMeta.RESTMapper = gmf.newRESTMapper(scheme, externalVersions, groupMeta)
if err := m.RegisterGroup(*groupMeta); err != nil {
return err
}
return nil
}
可以看Enable是关键,主要干了两件事情,把资源对象注册到Scheme中和在Registry中启用对应的GroupVersion信息。启用的GroupVersion信息,会把相应的GroupVersion的Metadata也进行了初始化。
第一点:资源对象注册到Scheme,这里是调用了AddToScheme方法,对于core这个组下面的资源的代码在k8s.io/kubernetes/pkg/apis/core/v1/register.go中,如下所示:
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
if err := scheme.AddIgnoredConversionType(&metav1.TypeMeta{}, &metav1.TypeMeta{}); err != nil {
return err
}
scheme.AddKnownTypes(SchemeGroupVersion,
&Pod{},
&PodList{},
&PodStatusResult{},
&PodTemplate{},
&PodTemplateList{},
&ReplicationControllerList{},
&ReplicationController{},
&ServiceList{},
&Service{},
&ServiceProxyOptions{},
&NodeList{},
&Node{},
&NodeConfigSource{},
&NodeProxyOptions{},
&Endpoints{},
&EndpointsList{},
&Binding{},
&Event{},
&EventList{},
&List{},
&LimitRange{},
&LimitRangeList{},
&ResourceQuota{},
&ResourceQuotaList{},
&Namespace{},
&NamespaceList{},
&ServiceAccount{},
&ServiceAccountList{},
&Secret{},
&SecretList{},
&PersistentVolume{},
&PersistentVolumeList{},
&PersistentVolumeClaim{},
&PersistentVolumeClaimList{},
&PodAttachOptions{},
&PodLogOptions{},
&PodExecOptions{},
&PodPortForwardOptions{},
&PodProxyOptions{},
&ComponentStatus{},
&ComponentStatusList{},
&SerializedReference{},
&RangeAllocation{},
&ConfigMap{},
&ConfigMapList{},
)
return nil
}
可以看出实际是调用了上述的scheme.addKnownTypes方法,从而把Pod、Service等常见资源注册到了Scheme中。注意,另外会添加internel对象资源到Scheme中,见k8s.io/kubernetes/pkg/apis/core/register.go,其实类型与前面v1版本的对象一致,这是版本为"__internel"
第二点:在Registry中Enable对应的GroupVersion,这里主要做了三个步骤:Enable对应的GV;生成对应组的metadata对象GrupMeta并注册到对应的组中;生成对应的RESTMapper对象。
type GroupMeta struct {
// GroupVersion represents the preferred version of the group.
GroupVersion schema.GroupVersion // 优先的组、版本信息
// GroupVersions is Group + all versions in that group.
GroupVersions []schema.GroupVersion // 本组中的所有的版本信息
// SelfLinker can set or get the SelfLink field of all API types.
// TODO: when versioning changes, make this part of each API definition.
// TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses
// to go through the InterfacesFor method below.
SelfLinker runtime.SelfLinker // SelfLinker这块一直没有研究
// RESTMapper provides the default mapping between REST paths and the objects declared in a Scheme and all known
// versions.
RESTMapper meta.RESTMapper // Kind与资源的对应关系,譬如Kind为Pod,那么对应的资源有:pod, pods
// InterfacesFor returns the default Codec and ResourceVersioner for a given version
// string, or an error if the version is not known.
// TODO: make this stop being a func pointer and always use the default
// function provided below once every place that populates this field has been changed.
InterfacesFor func(version schema.GroupVersion) (*meta.VersionInterfaces, error) // 一般指向DefaultInterfacesFor函数,基于下面的map来返回对应的VersionInterfaces
// InterfacesByVersion stores the per-version interfaces.
InterfacesByVersion map[schema.GroupVersion]*meta.VersionInterfaces // 存储每个组版本的VersionInterfaces,在VersionInterfaces中,
// 一般存储两个对象:runtime.ObjectConvertor: scheme、MetadataAccessor实现了资源对象版本转换与元数据获取接口
}
RESTMapper主要用于Kind与Resource之间的对应关系,具体的类型为GroupVersionKind与GroupVersionResource,这里存储的是近似的关系,而不一定是完全准确的,譬如Pod这种数据,那么会生成pod和pods这两种名字的资源,而Ingress,则生成ingress和ingresses两个名字的资源。下面我们看看生成RESTMapper的代码。
func (gmf *GroupMetaFactory) newRESTMapper(scheme *runtime.Scheme, externalVersions []schema.GroupVersion, groupMeta *apimachinery.GroupMeta) meta.RESTMapper {
// the list of kinds that are scoped at the root of the api hierarchy
// if a kind is not enumerated here, it is assumed to have a namespace scope
rootScoped := sets.NewString()
if gmf.GroupArgs.RootScopedKinds != nil {
rootScoped = gmf.GroupArgs.RootScopedKinds
}
ignoredKinds := sets.NewString()
if gmf.GroupArgs.IgnoredKinds != nil {
ignoredKinds = gmf.GroupArgs.IgnoredKinds
}
// 创建缺省的RESTMapper
mapper := meta.NewDefaultRESTMapper(externalVersions, groupMeta.InterfacesFor)
for _, gv := range externalVersions {
for kind := range scheme.KnownTypes(gv) { // 遍历scheme中的Kind
if ignoredKinds.Has(kind) { // 不需要放到资源中的资源数据类型,在Install中定义的GroupMetaFactory
continue
}
scope := meta.RESTScopeNamespace
if rootScoped.Has(kind) { // Root资源,没有Namespace的资源,在Install中定义的GroupMetaFactory。
// 一般有四种:Node,Namepsace,PersistentVolume,ComponentsStatus
scope = meta.RESTScopeRoot
}
mapper.Add(gv.WithKind(kind), scope) // Kind 《-》 Resource 映射
}
}
return mapper
}
到这里为止,终于分析完了ResourceEncodingConfig的缺省配置,从这里我们可以看出Registry的各个成员是如何赋值的。并且ResourceEncodingConfig的缺省配置是从Restistry中的组元数据(GroupMeta)中获得。下面我们在分析一下,针对缺省数据的覆盖是如何实现的。
首先,我么你研究一下,StorageSerializationOptions的缺省构建代码如下所示。
func NewStorageSerializationOptions() *StorageSerializationOptions {
return &StorageSerializationOptions{
DefaultStorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
StorageVersions: legacyscheme.Registry.AllPreferredGroupVersions(),
}
}
我们在启动API Server的时候,可以通过--storage-versions启动参数来指定,哪些group使用什么版本,甚至于把某个group的版本迁移到另外一个group、version来存储。
这个过程是在:StorageGroupsToEncodingVersion方法中完成的,并生成一个group到GroupVersion的映射。
这个映射在NewStorageFctory方法中,传入的参数为storageEncodingOverrides,然后通过下面的函数调用与缺省的ResourceEncodingConfig进行归并,如下所示:
resourceEncodingConfig := resourceconfig.MergeGroupEncodingConfigs(defaultResourceEncoding, storageEncodingOverrides)
注意这里会设置外部编码(对应于ETCD存储编码)和内部编码(对应于内存对象编码)。这里Overrinding的是外部编码,而内部编码的Group仍保持不变,Version仍然是"__internal"。
这里是更底层的编码配置,在例子中,我们带入了如下所示的参数:
[]schema.GroupVersionResource{
batch.Resource("cronjobs").WithVersion("v1beta1"),
storage.Resource("volumeattachments").WithVersion("v1beta1"),
admissionregistration.Resource("initializerconfigurations").WithVersion("v1alpha1"),
},
这将标识batch这个group下的crontjobs资源,它将采用不同的版本v1beta1来存储。
具体代码不难,就不细究了
到这里为止,基本把整个API Server的存储体系分析完了,API Server的框架还是比较复杂的,并且代码量也很大, 分析到这里,感觉还是有不少地方需要再去仔细研究,本文也就是作为一个指引吧。下次看代码的时候可以在基础上继续往下深挖,不至于每次都从头开始。
作者:何约什
链接:https://www.jianshu.com/p/daa4ff387a78
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。