我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。
问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制因子,就会删除它。我真正想知道的是,在六个月的数据流向某个主题后,这是否真的能消除我的消费者滞后,有人能帮我消除我经纪人的滞后吗。提前谢谢。
问题2:例如,如果我增加主题的部分和复制因子,那么从现在开始,我的制作人将如何将数据放入主题,在它是一个分区之前,数据将被淹没,并且我的消费者组只有一个消费者,这是默认的,并且只从一个分区获取。我的数据将分布在topic的分区中(即一个部分中的第一条消息和另一个分区中的下一条消息)。还有一件事,我需要在消费者方面进行任何更改,也像许多消费者一样,我必须开始从主题中读取数据,以便遵循顺序(即,我必须按照发布到主题的顺序获取数据)。。。
如果有人能对我面临的两个问题给出一个明确的解决方案,那就好了。提前感谢。
若你们的消费者有滞后,那个么你们产生(推送)主题的速度比你们阅读的速度快。增加分区数有助于并行运行多个使用者。例如,如果您有16个分区和4个使用者(具有相同的组id),那么每个使用者将读取4个分区。这减少了应由一个消费者处理的数据量(最好是4次)。
当您将消息推送到kafka时,您可以指定密钥。基于该密钥,kafka消费者决定消息应该转到哪个分区。
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
若不指定键,消息将均匀分布在所有分区中。因此,如果您需要有顺序(例如,每个用户),您可以将key设置为用户id。在这种情况下,一个用户的所有消息将始终位于一个分区中,并按照您推送它们的顺序。
为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
我已经研究阿帕奇Kafka一段时间了。 让我们考虑下面的例子。 考虑到我有3个分区的主题。我只有一个生产者和一个消费者。我在生成消息时没有指定key属性。 所以我知道在生产者方面,当我发布一条消息时,kafka使用的策略是将消息分配给这两个分区中的任何一个。 现在,我想知道的是,当我开始一个属于某个消费者群体的消费者听同一主题时,它将使用什么策略来从不同的参与者(因为有3个)中提取信息? 它是否会
我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但
我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。 将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。 我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对
据我所知,生产者不得重试任何发送失败,消费者必须在执行某些处理之前提交,以提供最多一次的交付语义。但复制因素是否也与交付语义相关?《KafkaReactor》样本项目中的注释如下: 复制因子为1的主题与acks=0且不重试的生产者相结合,可确保在第一次尝试时无法发送到Kafka的消息被丢弃 在ApacheKafka中,复制因素应该是最多提供一次交付语义的因素吗?