++++++++++++++++++++++++++++++++++++ 编辑 运行以下测试,每个主题都有一个分区。在测试运行之前,topic1中有10条消息,Topic2中有10条消息。运行代码并让10条topic1消息得到处理,但是当topic2消息得到处理时,我向topic1发送了更多的消息,但在处理完来自topic2的所有预先存在的消息之前,监听器没有处理这些消息。
我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk
null null
我正在数据库中为主题外部化kafka消费者元数据,包括消费者组和组中消费者的数量。 Consumer\u info表具有 主题名称,消费者组名称,组中的消费者数量消费者类名称 在app server启动时,我正在读取表并根据表中设置的数字创建使用者(线程)。如果使用者组计数设置为3,我将创建3个使用者线程。这基于给定主题的分区数 现在,如果我需要横向扩展,我如何将属于同一组的消费者分布在多个应用服
在某些情况下,我使用Kafka流对主题的小内存(hashmap)投影进行建模。K,V缓存确实需要一些操作,因此它不是GlobalKTable的好例子。在这种“缓存”场景中,我希望我的所有兄弟实例都具有相同的缓存,因此我需要绕过消费者组机制。 要实现这一点,我通常只需使用随机生成的应用程序ID启动我的应用程序,因此每个应用程序每次重新启动都会重新加载主题。唯一的警告是,我最终会有一些消费者群体在Ka
我有两个vm服务器(比如S1和S2),需要在集群模式下安装kafka,其中只有一个分区和两个副本(一个是领导者本身,另一个是追随者),以确保可靠性。 从这个集群设置中获得了高层次的想法,希望确认以下策略是否正确。 首先将zookeeper设置为两个节点上的群集,以实现高可用性(HA)。如果我只在单个节点上设置zk,然后该节点关闭,则整个集群将关闭。正当在最新的Kafka版本中也必须使用zk吗?看来
我们希望获得Kafka消费群体指标(例如,节流和字节率)。 我们已经使用以下工具完成了此操作: Kafka消费者Java应用程序的JMX Mbean CLI实用程序: bin/kafka-consumer-groups.sh--描述--组group_name--bootstrap-serverlocalhost: port . 问题:这可以通过使用一些Java库以编程方式完成吗? 到目前为止,我们
我运行这个命令: kafka使用者组--引导服务器localhost:9092--组我的使用者组--重置偏移量--最早--执行--主题my-topic-1 它给出了错误: 命令的语法不正确。 根据此命令的帮助结果,我键入的内容似乎是正确的。 我在这里犯了什么错误?
我想检查手动分配给特定主题的消费者组的滞后,这可能吗。我使用的是Kafka-0.10.0.1。我用的是shKafka跑步课。shKafka。管理ConsumerGroupCommand-new consumer-description-bootstrap server localhost:9092-group test但它说不存在组,所以我想知道当我们手动分配分区时,是否可以检查使用者的延迟。
跟进这个问题——我想知道消费者组和偏移过期之间的语义学。总的来说,我很想知道,kafka协议如何确定一些特定的偏移量(对于消费者组、主题、分区组合)过期?它是基于作为组协议一部分的消费者的定期提交,还是在所有消费者被视为已死/关闭后应用?我认为这可能会在处理数据不经常生成的主题分区时产生影响。在我的例子中,我们有一个来自相当主题的消费者组读取(产生的数据不多)。由于消费者组不定期提交任何偏移量,我
我有一个用例,其中消息需要广播到水平可扩展、无状态的应用程序集群中的所有节点,我正在考虑Kafka。由于集群的每个节点都需要接收主题中的所有消息,因此集群的每个节点都需要有自己的消费者组。 这里可以假设消息量不是很高,以至于每个节点都无法处理所有消息。 为了用Kafka实现这一点,当从主题消费时,我最终会使用消费者流程的instanceId(或某个唯一标识符)作为消费者组id。这将推高消费群体的数
Kafka消费者是否一直在检查代理(Kafka服务器)的运行状况,反之亦然 让我们说,无论如何,消费者和经纪人都知道彼此的健康状况,那么消费者将如何准确地从分区中读取 假设一个主题有48个分区,该主题有两个使用者组,那么有多少线程将使用所有分区中的数据
我们面临的情况是,只要有滞后,我们的akka流kaka消费者处理率就会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。 MSK群集-10个主题-每个40个分区= 为了在系统中实现高吞吐量和并行性,我们实现了akka-stream-kafka消费者分别订阅每个主题分区,从而在消费者和分区之间实现1:1映射。 这是消费者设置: ec2服务实例数-7 每个服务为10个主题中的每一个
我有一个用例,在这个用例中,我有3个Kafka消费者向一个主题写作,每个消费者中的消息都需要按顺序处理。在这种情况下,如果某个消费者中存在延迟,则需要更早处理的消息将被丢弃(写入条件)。那么,有没有一种方法可以维持这些消息的顺序呢。
我知道,如果我们在消费者组中有多个分区和几乎相同数量的消费者,那么处理速度会加快。如果我们想保持事件的顺序并在收到每个事件时处理它,我们如何使用多个分区和消费者来实现这一点。 在我的用例中,按顺序处理事件非常关键,否则系统会崩溃。我想使用多个分区来增加并行性,但不知何故“让它们按顺序”。