当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka-动态创建流

咸玄天
2023-03-14

我需要从配置文件动态创建kafka流,其中包含每个流的源主题名称和配置。应用程序需要有几十个Kafka流和流将是不同的每个环境(例如阶段,prod)。它可能做到这一点与sping-kafka库?

我们可以通过Kafka流轻松做到这一点:

@Bean
public List<KafkaStreams> kafkaStreams() {
    return streamRouteProperties.stream()
            .map(routeProperty -> createKafkaStream(routeProperty))
            .collect(toList());
}

private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
    Topology topology = builder.build();
    StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
    return new KafkaStreams(topology, streamsConfig);
}

我们需要实现springSmartLifecycle接口,这样所有流都将自动启动和关闭。

是否可以使用sping-kafka做同样的事情?正如我所看到的,我们需要在代码中创建每个Kafka流,我看不到如何使用StreamsBuilderFactoryBean创建Kafka流列表的可能性。

@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
    Consumed<String, String> consumed = ..;
    KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
    kStream.process(() -> eventProcessor);
    return kStream;   
}

@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

但是如何使用StreamsBuilderFactoryBean动态创建Kafka流列表呢?

共有1个答案

晋鹤轩
2023-03-14

StreamsBuilderFactoryBean只是为Spring应用程序上下文带来了一些自以为是的、方便的API,但这并不意味着您应该始终与之捆绑在一起。

幸运的是,StreamsBuilderFactoryBean与常规Kafka流相比没有太大的价值。它的最大功能是对内部创建的KafkaStreams进行lfecycle控制。

您可以随意使用原始的Kafka Streams API,不要试图将代码过于复杂化,使其成为基于StreamsBuilderFactoryBean的您的需求,StreamsBuilderFactoryBean实际上是为一组静态选项设计的。

 类似资料:
  • 问题内容: 你好,我有这个设置 我需要为每个按钮获取以下内容 在Java中是否可以为我声明的每个按钮动态创建此按钮?因为当我有5个按钮时,我不需要3x5 = 15行代码,而是只有几行具有动态创建的按钮。 问题答案: 编写一个小循环并将您的按钮存储在数组中:

  • 你好,我已经准备好了 我需要为每个按钮获得以下内容 在Java中,是否可以为我声明的每个按钮动态创建它?因为当我有5个按钮时,我不希望3x5=15行代码,而只希望有几行动态创建的按钮。

  • 问题内容: 给定一个类名,我想动态创建一个Groovy类,向其添加属性和方法。我使用创建新类 对于我使用的方法 其中it.key是字符串(方法名),it.value是闭包。这很方便,因为我可以指定方法参数类型并进行类型检查。但是,如果不给它赋值,就无法指定动态创建的属性类型。我可以通过显式定义属性的getter和setter来解决此问题。这可行,但是metaClass.name = value或m

  • 问题内容: 我在mysql上创建数据库。首先创建主体表,每个表平均有30列。日志表的标准是引用表的pk加上每列*2。像这样: 参考表: 日志表: 现在,我想要创建一个过程,在该过程中,我将表名作为参数传递,并生成表日志查询并执行它。 做这个的最好方式是什么? 问题答案: 为了使一个字符串代表一个表(或数据库)名称,您将需要用变量连接查询字符串,并在存储过程中准备/执行一条语句。这是一个基本示例。

  • 问题内容: 我陷入GWT CellTable的问题。我需要动态创建单元表,而我没有实体(Bean)类。我已经看到了所有celltable的示例,并且在没有实体类的情况下进行了大量搜索。 我需要根据数据库中存储的一些元数据动态填充表。我可以创建表结构 考虑有两个类,一个是GRID,另一个是COLUMN,用于元数据和列定义。GRID将具有COLUMNS的列表作为列定义 现在,我需要从数据库中获取网格并

  • 问题内容: 我需要动态创建一个类。为了更详细,我需要动态创建Django类的子类。 通过“动态”,我打算基于用户提供的配置创建一个类。 例如 我想要一个命名为该类的子类的类。 该类应具有所选属性的列表。 ....在这种情况下 有什么有用的提示吗?:) 问题答案: 您可以通过调用内置函数并传递适当的参数来动态创建类,例如: 它适用于新型类。我不确定这是否也适用于老式类。