当前位置: 首页 > 知识库问答 >
问题:

Spring kafka在运行时重新创建kafka流拓扑

慕容玉书
2023-03-14

我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动KafkaStreams。我不想重新启动我的应用程序。有人能建议我如何在运行时做到这一点吗?

共有1个答案

冷俊健
2023-03-14

我找到了一个解决方案。我扩展了StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
    return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}

public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

    private StreamsBuilder instance;

    public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
        super(streamsConfig);
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    protected synchronized StreamsBuilder createInstance() {
        if (instance == null) {
            instance = new StreamsBuilder();
        }
        return instance;
    }

    @Override
    public synchronized void stop() {
        instance = null;
        super.stop();
    }
}

当我构建拓扑时,我不使用StreamsBuilder,而是使用StreamsBuilderFactoryBean#getObject():

@Component

公共类动态流{

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

public void init() {
    StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
        //build topology
}

//call this method when stream reconfiguration is needed
public void reinitialize() {
    streamsBuilderFactoryBean.stop();
    init();
    streamsBuilderFactoryBean.start();
}

}

 类似资料:
  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能

  • 我终于觉得我有了一个在redis数据库上写的toopology。我有一个插销要打印,还有一个插销要插入Redis。但当我尝试启动拓扑时,会出现以下错误:

  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 问题内容: 我知道我可以使用它来创建DDL创建触发器; 问题在于,该触发器将在“创建序列”之类的DDL上运行;如何仅对“创建表” DDL执行此操作? 问题答案: CREATE OR REPLACE TRIGGER create_table_trigger AFTER CREATE ON SCHEMA BEGIN IF SYS.DICTIONARY_OBJ_TYPE = ‘TABLE’ THEN .