当前位置: 首页 > 知识库问答 >
问题:

Spring Boot Kafka:对特定主题的所有实例使用相同的消息

桓喜
2023-03-14

我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行。Kafka自动平衡所有这些实例之间的foo主题消息,因为它们使用相同的consumer-id。这很好,对于app-3来说,这实际上是需要的。

然而,在app-2中,来自foo主题的消息用于缓存逐出。基本上,逻辑是,如果有一个新的foo项,那么可能应该清除当前现有的缓存,因为它们的内容取决于foo项。问题是app-2也是集群的,这意味着默认情况下,kafka逻辑,每个实例将只接收发送到foo主题的部分消息。对于这个特定的应用程序tho,这不能正常工作,因为每当有新的foo项时,所有实例都需要知道它,因为它们都需要清除本地缓存。

据我所知,如果我想保持当前的逻辑,我有以下两个选择:

  • 为app-2的所有实例引入分布式缓存,以便它们都共享相同的缓存。然后如果只有一个实例收到foo-Item也没关系,因为缓存驱逐也会影响其他实例的缓存;即使他们从未了解过foo-Item。我想避免这种解决方案,因为分布式缓存会增加显着的复杂性和开销。
  • 不知何故,设法为app-2的每个实例使用不同的消费者ID。然后他们将被kafka视为不同的消费者,他们都将获得每个foo主题消息。但是,我甚至不知道如何以编程方式做到这一点。应用程序的代码不知道复制的实例,无法访问有关它是哪个节点的任何信息。如果我在启动时使用随机生成的字符串,那么每次此类实例重新启动时,它都将被视为新消费者,并且必须重新处理所有之前的消息。这也是不正确的行为。

这是我的底线问题:有没有可能让app-2的所有实例都接收来自foo主题的所有消息,而不完全破坏Kafka的工作方式?我知道使用kafka消息进行缓存逐出可能非常不传统,我完全能够找到一种不依赖于kafka主题消息的缓存逐出逻辑的替代机制。然而,这些应用程序只是为了演示,我认为如果不止一个应用程序阅读了这个主题,那就太酷了。但是,如果我最终不得不破解一个肮脏的变通方法来实现它,那么这对于演示来说也是不好的,我宁愿实现另一种缓存逐出的方法。

共有1个答案

唐涛
2023-03-14

正如您所提到的,您可以使用具有随机字符串的不同消费者ID。

如果从一开始就读取通知,那么您可能有消费者配置。AUTO\u OFFSET\u RESET\u CONFIG设置为消费者配置中的某个位置的“最早”。如果是这种情况,删除它可能会解决您的问题-当应用程序启动时,它只会在消费者开始侦听后收到发送的通知。

 类似资料:
  • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;

  • 目前,我有一个类分数,它允许我用三种不同的方式创建分数 对于一个整数,在这种情况下,给定的整数将是分子,分母将设置为1 有2个整数,分子和分母 最后一种方法是解析一个字符串,该字符串必须与REGEX-?\d/[1-9]\d* gcd将尽可能减少生成的分数。 我现在想实现的是,具有相同分子和分母的分数实例具有相同的引用例如。 应该返回true。 我研究了一些关于泛型和边界的章节,但我不确定这是否是我

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有这样的拓扑: 拓扑中最后提到的源是每个应用程序实例的特定主题。我希望该主题仅由该实例处理。此主题的数据由前一个处理器推送,基于哪个实例必须处理该消息。 但是一旦流启动,它会尝试将实例特定的主题分区也分配给其他实例。我们可以在Kafka流中实现这个要求吗? 我希望一个主题仅由特定实例处理。

  • 我们的拓扑使用从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。 在某个时刻,我在worker中观察到大量重复的警告消息。日志 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtil

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk