当前位置: 首页 > 知识库问答 >
问题:

在一个应用程序中使用多个不同的kafka集群

百里业
2023-03-14

这可能不是典型的设置,但由于更高的决策,我们最终在一个应用程序中有多个 kafka 集群,每个集群有多个主题,每个集群可能具有不同的序列化策略。Json/avro.avro可能与融合的架构注册表一起使用,或者使用单个对象编码。

好吧,我通过构建自己的抽象和注册中心,分析配置并手动创建大部分内容,以某种方式实现了它,但我觉得我需要在几个地方多次重复主题名称、模式注册url等内容,以便创建所有需要的bean。丑得要命。

我想问一下,是否有更好的方法和支持,我可能忽略了这一点。

我需要创建kafka集群的N个表示,配置一次。为给定的kafka集群配置相应的主题,为主题配置合流模式注册表(如果适用)等,以便我可以创建Avro模式文件的实例,将其发送到KafkaTemplate,它就会工作。

共有1个答案

荀裕
2023-03-14

这取决于复杂性和配置的差异程度,以及这是否会有所帮助,但是您可以在@KafkaListener和每个KafkaTemplate中覆盖单个Kafka属性(例如引导服务器,反序列化程序等)。

例如

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}

编辑

覆盖可以外部化-如下所示:

@KafkaListener(id = "so67959209", topics = "so67959209",
        properties = "${consumer.overrides.one}")
public void listen(byte[] in) {

}

consumer.overrides.one=bootstrap.servers:localhost:9092\n \
 key.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer\n \
 value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer

重要的是覆盖是原始的Kafka属性名称,而不是引导版本-例如bootstrap.servers与引导的引导服务器。

 类似资料:
  • 这是否意味着我的整个应用程序只能连接到单个Kafka集群,或者KafkaStreams的每个实例只能连接到单个集群? 我可以创建多个连接到不同集群的具有不同属性的KafkaStreams实例吗?

  • 也许这很奇怪,我只需要在一个配置文件中添加bootstrap.servers就可以了?找不到任何关于那件事的信息...

  • 我想在1个应用程序中设置2个Firebase,但有不同的持久性。 一个是启用,第二个是禁用。 请通知我如何设置。 一个应用程序中的多个Firebase项目 但这并没有解释, 如何为第二个Firebase设置持久性。 我要启用第一火线的持久性。 正如我们所看到,getInstance是静态方法, 我们怎么知道, 将属于第一或第二消防基地。

  • 我有 和 两个不同的ApplicationPath和类如下所示。 如果我在param-value中取出一个包,这是有效的,如果我更改@Path注释之一,这也是有效的,所以这是我的配置的问题? 我用的是1.10号球衣。谢谢大家。

  • 问题内容: 我想通过JDBC连接到两个不同的Oracle数据库(一个8.0.5.0.0和一个12c)。我确实有两个JDBC驱动程序,它们可以通过简单的“ hello world”应用程序分别成功地连接到相应的DB。下面,我将它们都放在一个Java应用程序中,不幸的是,该应用程序不再起作用(加载了两个驱动程序)。 我已经阅读了这篇文章:从SAMEVENDOR处理多个JDBC驱动程序。提到的选项1可能

  • 我见过,但对于我的(简单的)用例来说,它似乎有些过头了。 我也知道,但我不想仅仅为此编写和维护代码。 我的问题是:有没有一种方法可以用kafka原生工具实现这个主题调度,而不用自己写一个Kafka-Consumer/Producer?