当前位置: 首页 > 工具软件 > Guice > 使用案例 >

【Elasticsearch】模块管理 Guice

西门正平
2023-12-01

【使用Guice】

es每个模块都是通过 Service(实现业务功能) 和 Module(配置绑定信息) 类共同实现模块功能,以 CLusterModule 为例:
Module    ->    AbstractMoule    ->    ClusterModule(继承configure()方法来进行模块配置)
定义好的模块由ModulesBuilder类统一管理,ModulesBuilder是ES对Guice的封装,内部调用Guice接口,主要对外提供两个方法。
· add方法:添加创建好的模块
· createInjector方法:调用Guice.createInjector创建并返回Injector,后续通过Injector获取相应Service类的实例。
使用ModulesBuilder进行模块管理的伪代码示例:
ModulesBuilder modules = new ModulesBuilder();
//以Cluster模块为例ClusterModule clusterModule = new ClusterModule();
modules.add(clusterModule);
//省略其他模块的创建和添加
...
//创建Injector,并获取相应类的实例
injector = modules.createInjector();
setGatewayAllocator(injector.getInstance(GatewayAllocator.class)
模块化的封装让 ES 易于扩展,插件本身也是一个模块,节点启动时被模块管理器添加进来

【模块管理】

Elasticseach 将 Guice 代码直接内置到源码中,定义了一系列的集合,在节点 Node 初始化时注入 Module 到缓存中。在后续需要用到某个类的实例时调用 Injector 来获取实例。Guice 在 Es 中所在的包:
org.elasticsearch.common.inject
ModulesBuilder 复制模块的注入和保存:
public class ModulesBuilder implements Iterable<Module> {
    private final List<Module> modules = new ArrayList<>();
    // 模块注入
    public ModulesBuilder add(Module... newModules) {
        Collections.addAll(modules, newModules);
        return this;
    }
    @Override
    public Iterator<Module> iterator() {
        return modules.iterator();
    }
    // 通过 Injector.getInstance(clazz) 获取某个类实例
    public Injector createInjector() {
        Injector injector = Guice.createInjector(modules);
        ((InjectorImpl) injector).clearCache();
        // in ES, we always create all instances as if they are eager singletons
        // this allows for considerable memory savings (no need to store construction info) as well as cycles
        ((InjectorImpl) injector).readOnlyAllSingletons();
        return injector;
    }
}

查看注入的一些 modules:

【log】ModulesBuilder.add 模块注入 module=org.elasticsearch.cluster.ClusterModule
【log】ModulesBuilder.add 模块注入 module=org.elasticsearch.indices.IndicesModule
【log】ModulesBuilder.add 模块注入 module=org.elasticsearch.gateway.GatewayModule
使用 Guice 的步骤就是通过  Injector.getInstance(clazz) 可以获取到目标类的实例,这个类是通过  Guice.createInjector(modules) 生成的,他将 modules 缓存集合传递进去。
Guice.createInjecter 代码如下:
public static Injector createInjector(Stage stage, Iterable<? extends Module> modules) {
    // 构造一个 Inject 实例
    // 调用 addModules() 将所有模块进行注入
    return new InjectorBuilder().stage(stage).addModules(modules).build();
}

添加进去的模块会保存到 InjectorShell.Builder 的 modules 属性中。
InjectorShell.Builder.addModules() 代码如下:
void addModules(Iterable<? extends Module> modules) {
    for (Module module : modules) {
        this.modules.add(module);
    }
}

然后调用 build() 创建 Injector 对象:
InjectorShell.Builder.build()
List<InjectorShell> build(Initializer initializer, BindingProcessor bindingProcessor,Stopwatch stopwatch, Errors errors) {
    // 省略
    List<InjectorShell> injectorShells = new ArrayList<>();
    // InjectorShell 对象引用了 injector
    // 这里是集合最终返回集合第一个 InjectorShell.getInjector() 也就是调用 shells.get(0).getInjector()
    injectorShells.add(new InjectorShell(elements, injector));
    // recursively build child shells
    PrivateElementProcessor processor = new PrivateElementProcessor(errors, stage);
    processor.process(injector, elements);
    for (Builder builder : processor.getInjectorShellBuilders()) {
        injectorShells.addAll(builder.build(initializer, bindingProcessor, stopwatch, errors));
    }
    stopwatch.resetAndLog("Private environment creation");
    return injectorShells;
}

于是核心的 Injector 就得到了,这个类的作用如下:
容器存在的意义主要是实现依赖注入,也就是通过容器获取一个目标类的对象时,这个对象依赖的其他对象能够自动注入然后拼装成最终的我们需要的对象进行返回。当然依赖关系程序需要从一个地方去读取。读取的依据就是模块内声明的绑定。
这个绑定就通常使用 Guice 需要实现的 AbstractModule.configure() 接口。调用 bind() 连接我们的接口和接口实现类,以及依赖,以及 @Inject 注解标注的依赖。这样 Injector 就能根据这个绑定帮我们自动注入依赖并生成最终对象。
以 ClusterModule 为例查看绑定过程.ClusterModules.configure() 代码如下:
protected void configure() {
    // asEagerSingleton 标记为单例模式
    // bind().toInstance() 绑定到这个类已创建的一个对象上,后续直接获取该对象
    // bind().to() 绑定接口实现类或者子类,后续直接过去子类对象
    bind(GatewayAllocator.class).asEagerSingleton();
    bind(AllocationService.class).toInstance(allocationService);
    bind(ClusterService.class).toInstance(clusterService);
    bind(NodeConnectionsService.class).asEagerSingleton();
    bind(MetadataDeleteIndexService.class).toInstance(metadataDeleteIndexService);
    bind(MetadataIndexStateService.class).asEagerSingleton();
    bind(MetadataMappingService.class).asEagerSingleton();
    bind(MetadataIndexAliasesService.class).asEagerSingleton();
    bind(MetadataUpdateSettingsService.class).asEagerSingleton();
    bind(MetadataIndexTemplateService.class).asEagerSingleton();
    bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
    bind(DelayedAllocationService.class).asEagerSingleton();
    bind(ShardStateAction.class).asEagerSingleton();
    bind(MappingUpdatedAction.class).asEagerSingleton();
    bind(TaskResultsService.class).asEagerSingleton();
    bind(AllocationDeciders.class).toInstance(allocationDeciders);
    bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

通过上面的绑定,在后续调用 Injector.getInstance(clazz) 可以直接获取到目标实例,比如  GatewayAllocator.class 这个服务,Guice 在创建这个服务对象(asEagerSingleton标记为单例)时同时会注入依赖,而依赖是通过 @Inject 注解标注的,Guice 也正是识别这个注解来自动注入下来。
GatewayAllocator 类构造函数如下:
@Inject
public GatewayAllocator(RerouteService rerouteService,
                        TransportNodesListGatewayStartedShards startedAction,
                        TransportNodesListShardStoreMetadata storeAction) {
    // @Inject 标注构造函数,依赖会由Guice自动注入
    this.rerouteService = rerouteService;
    this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
    this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
}

在节点 new Node  初始化节点对象时会调用 Injector.getInstance() 获取 GatewayAllocator 这个类:
Node 构造函数如下:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));

接下来看 Injector.getInstance() 是如何找到的对象。Injector 实现类是 InjectorImpl。
InjectorImpl.getProvider() type=org.elasticsearch.gateway.GatewayAllocator    =>
InjectorImpl.getProviderOrThrow() key=Key[type=org.elasticsearch.gateway.GatewayAllocator, annotation=[none]]
InjectorImpl.getInstance() 代码如下:
@Override
public <T> T getInstance(Class<T> type) {
    return getProvider(type).get();
}

InjectorImpl.getProviderOrThrow() 代码如下:
<T> Provider<T> getProviderOrThrow(final Key<T> key, Errors errors) throws ErrorsException {
    // 根据key获取 InternalFactory
    // Key[type=org.elasticsearch.gateway.GatewayAllocator, annotation=[none]]
    final InternalFactory<? extends T> factory = getInternalFactory(key, errors);
        // 省略
    // 获取依赖
    final Dependency<T> dependency = Dependency.get(key);
    // 返回一个 Provider<T>
    // 后续调用 Provider.get() 获取实例
    return new Provider<T>() { 省略}
}

最终调用的  Provider<T>.get() 获取目标实例:
new Provider<T>() {
    @Override
    public T get() {
        final Errors errors = new Errors(dependency);
        try {
            T t = callInContext(new ContextualCallable<T>() {
                @Override
                public T call(InternalContext context) throws ErrorsException {
                    // 注入依赖
                    context.setDependency(dependency);
                    try {
                        return factory.get(errors, context, dependency);
                    } finally {
                        context.setDependency(null);
                    }
                }
            });
            errors.throwIfNewErrors(0);
            return t;
        } catch (ErrorsException e) {
            throw new ProvisionException(errors.merge(e.getErrors()).getMessages());
        }
    }
    @Override
    public String toString() {
        return factory.toString();
    }
}

【自定义模块】

创建一个模块,并在 Node 启动时加载这个模块。并获取其实例。

【处理器缓存】

Elasticsearch 内部很多地方都使用了 Guice 容器思想,将函数、处理类等等提前缓存到各种集合中,根据标志从集合中获取出具体要执行的函数、处理类进行调用。大概分为下面几种:
Transport.RequestHandlers.registerHandler()    注册节点Node间通信时,请求处理器。缓存在 Transport.RequestHandlers 类的 requestHandlers 对象中。
ActionModule.setupActions()        缓存节点Node通信时,节点请求处理。缓存在 NamedRegistry 类的 registry 对象中。
RestController.registerHandler()     缓存http处理器,根据不同url处理请求。缓存在 RestController 类的 handlers 对象中。
 类似资料: