我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行。Kafka自动平衡所有这些实例之间的foo主题消息,因为它们使用相同的consumer-id。这很好,对于app-3来说,这实际上是需要的。
然而,在app-2中,来自foo主题的消息用于缓存逐出。基本上,逻辑是,如果有一个新的foo项,那么可能应该清除当前现有的缓存,因为它们的内容取决于foo项。问题是app-2也是集群的,这意味着默认情况下,kafka逻辑,每个实例将只接收发送到foo主题的部分消息。对于这个特定的应用程序tho,这不能正常工作,因为每当有新的foo项时,所有实例都需要知道它,因为它们都需要清除本地缓存。
据我所知,如果我想保持当前的逻辑,我有以下两个选择:
这是我的底线问题:有没有可能让app-2的所有实例都接收来自foo主题的所有消息,而不完全破坏Kafka的工作方式?我知道使用kafka消息进行缓存逐出可能非常不传统,我完全能够找到一种不依赖于kafka主题消息的缓存逐出逻辑的替代机制。然而,这些应用程序只是为了演示,我认为如果不止一个应用程序阅读了这个主题,那就太酷了。但是,如果我最终不得不破解一个肮脏的变通方法来实现它,那么这对于演示来说也是不好的,我宁愿实现另一种缓存逐出的方法。
正如您所提到的,您可以使用具有随机字符串的不同消费者ID。
如果从一开始就读取通知,那么您可能有消费者配置。AUTO\u OFFSET\u RESET\u CONFIG设置为消费者配置中的某个位置的“最早”。如果是这种情况,删除它可能会解决您的问题-当应用程序启动时,它只会在消费者开始侦听后收到发送的通知。
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;
目前,我有一个类分数,它允许我用三种不同的方式创建分数 对于一个整数,在这种情况下,给定的整数将是分子,分母将设置为1 有2个整数,分子和分母 最后一种方法是解析一个字符串,该字符串必须与REGEX-?\d/[1-9]\d* gcd将尽可能减少生成的分数。 我现在想实现的是,具有相同分子和分母的分数实例具有相同的引用例如。 应该返回true。 我研究了一些关于泛型和边界的章节,但我不确定这是否是我
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
我有这样的拓扑: 拓扑中最后提到的源是每个应用程序实例的特定主题。我希望该主题仅由该实例处理。此主题的数据由前一个处理器推送,基于哪个实例必须处理该消息。 但是一旦流启动,它会尝试将实例特定的主题分区也分配给其他实例。我们可以在Kafka流中实现这个要求吗? 我希望一个主题仅由特定实例处理。
我们的拓扑使用从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。 在某个时刻,我在worker中观察到大量重复的警告消息。日志 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtil
我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk