当前位置: 首页 > 面试题库 >

如何编写Kafka使用者-单线程与多线程

司徒英卓
2023-03-14
问题内容

我编写了一个单一的Kafka使用者(使用Spring
Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。

但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容:

  • 实际上,我可以在部署池中定义多个实例,并在所有这些实例上运行此WAR。这意味着,他们所有人都在听同一个话题,属于同一个消费群体,并且实际上会在自己之间划分分区。下游逻辑将按原样工作。对于我的用例来说,这工作得很好,但是,我不确定这是否是遵循的最佳方法?

  • 在线阅读时,我在这里和这里遇到了各种资源,人们在其中定义了一个消费者线程,但在内部却创建了多个工作线程。在某些示例中,我们可以定义执行下游逻辑的多个使用者线程。考虑这些方法并将其映射到部署环境,我们可以达到相同的结果(如我上面的理论解决方案所能达到的),但是使用的机器数量更少。

就我个人而言,我认为我的解决方案简单,可扩展,但可能不是最佳选择,而第二种方法可能是最佳选择,但我想了解您的经验,建议或我应该考虑的其他指标/限制?另外,我正在考虑我的理论解决方案,实际上我可以作为Kafka的消费者使用简单的简单机器。

据我所知,我尚未发布任何代码,如果需要将此问题移至另一个论坛,请告诉我。如果您需要特定的代码示例,我也可以提供它们,但就我的问题而言,我认为它们并不重要。


问题答案:

您现有的解决方案是最好的。移交给另一个线程将导致偏移管理问题。只要您有足够的分区,Spring kafka允许您在每个实例中运行多个线程。



 类似资料:
  • 我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。 但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容- > 实际上,我可以在部署池中定义多个实

  • 问题内容: 我想看看使用多线程生产者而不是单线程生产者会有多少时间差异。我在本地计算机上设置了ActiveMQ,编写了生产者类,该类将初始化并在其构造函数中启动JMS连接。我将消息限制设置为3M,将所有消息推送到ActiveMQ大约花费了50秒。我只发送了一个字符串“ hello world” 3M次。 然后,我使用了相同的生产者对象(一个连接但有多个会话),并使用线程大小为8的ExecutorS

  • Kafka的doc给出了一种方法,大约用以下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 我的代码: 但它不起作用,引发了一个异常: JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我

  • 为什么单线程和多线程脚本具有相同的处理时间?多线程实现不是应该少1/#线程数吗?(我知道当您达到最大cpu线程时,回报会递减) 我搞砸了我的实现吗?

  • 我对Apache Kafka是新手,我试图理解以下两个方面的区别: 创建属于同一组id的两个使用者,这些使用者来自同一主题的两个分区。 用两个线程创建一个使用者,这些线程来自同一主题的两个分区。 在第一种方法中,我实际上理解的是,每个使用者将只使用与之“相关”的分区的消息,因为这两个使用者属于同一个组。 因此,在下面的示例中,可能会发生一些不同的情况: Thread1使用AAAA和CCCC/Thr

  • 我正在进行一个项目,我们在该项目中遇到了线程问题。有一个问题使线程终止,并且线程无法自动重新启动,因此我们想更改当前的实现。我们想出了一个想法来运行一个观察者线程来管理文件处理线程。 请有人建议如何制作监视程序状态的线程以及如何执行此操作的想法。 编辑:所以基本上这就是发生的事情。用户上载一些写入文件共享的媒体文件。移动服务中运行的线程扫描媒体文件并附加到电子邮件。在此过程中,线程终止,无法将媒体