我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。
但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容-
>
实际上,我可以在部署池中定义多个实例,并在所有这些实例上运行这场战争。这意味着,他们都在听同一个话题,属于同一个消费者群体,并且实际上会在他们之间划分分区。下游逻辑将按原样工作。这对于我的用例来说非常好,但是,我不确定这是否是要遵循的最佳方法?
在网上阅读时,我在这里和这里遇到了一些资源,人们在这里定义了一个消费者线程,但在内部创建了多个工作线程。还有一些示例,我们可以定义多个执行下游逻辑的消费者线程。考虑这些方法并将它们映射到部署环境,我们可以获得相同的结果(正如我上面的理论解决方案所能做到的那样),但机器数量更少。
就个人而言,我认为我的解决方案简单、可扩展但可能不是最佳的,而第二种方法可能是最佳的,但想知道您的经验、建议或我应该考虑的任何其他指标/约束?此外,我正在考虑我的理论解决方案,我实际上可以使用简单的机器作为Kafka消费者。
虽然我知道,但我还没有发布任何代码,如果我需要将此问题转移到其他论坛,请告诉我。如果您需要特定的代码示例,我也可以提供它们,但在我的问题中,我认为它们并不重要。
如果你目前的方法有效,坚持下去。这是简单而优雅的方法。
如果由于某种原因无法增加分区的数量,但需要更高级别的并行性,则只能使用方法2。但是你需要担心的是订单和比赛条件。如果您需要这样做,我建议您使用akka stream kafka库,它提供了正确处理偏移提交的工具,可以并行执行您需要的操作,然后合并回一个流,保留原始顺序等。否则,这些操作很容易出错。
您现有的解决方案是最好的。转移到另一个线程将导致偏移管理问题。Spring kafka允许您在每个实例中运行多个线程,只要您有足够的分区。
问题内容: 我编写了一个单一的Kafka使用者(使用Spring Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。 但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容: 实际上,我可以在部
我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。
Kafka的doc给出了一种方法,大约用以下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 我的代码: 但它不起作用,引发了一个异常: JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我
我开发了一个基于Java的Kafka消费者,其中每个消费者实例有100个线程,当消费过程开始时,每个线程都得到一个分区(因为有100个分区),然后消费就完成了。 但这里的问题是这只是一个消费者。但是如果我添加多个nodejs进程,我可以完成100个消费者,但是添加每个消费者是一个成本很高的再平衡操作。 我想知道这是否正确的做法?有没有一种方法可以使用Kafka节点触发一个100线程的消费者?
我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请
本文向大家介绍python kafka 多线程消费者&手动提交实例,包括了python kafka 多线程消费者&手动提交实例的使用技巧和注意事项,需要的朋友参考一下 官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html 以上这篇python kafka 多线程消费者&手动提交实例就是小编分享给大家