使用kafka版本2.11-0.11.0.3发布10,000条消息(所有消息的总大小为10MB),将有2个消费者(具有相同的group-id)作为并行处理来消费该消息。在消费过程中,两个消费者都消费了相同的消息。
以下错误/警告是Kafka抛出的
警告:此成员将离开组,因为使用者轮询超时已过期。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。
信息:尝试心跳失败,因为组正在重新平衡
信息:向协调器发送LeaveGroup请求
警告:同步自动提交偏移量{INGEST-DATA-1=OFFSETANDMETADATA{OFFSET=5506,LeaderEPOCH=NULL,METADATA=“”}}失败:无法完成提交,因为组已重新平衡并将分区分配给其他成员.这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。
向kafka提供了以下配置
server.properties
max.poll.interval.ms=30000
group.initial.rebalance.delay.ms=0
group.max.session.timeout.ms=120000
group.min.session.timeout.ms=6000
Consumer.Properties
session.timeout.ms=30000
request.timeout.ms=40000
应该改变什么来解决多次消费?
你的消费者是同一群吗?如果是,如果使用者离开/死亡/超时而没有提交它已处理的一些消息,那么您将有多个使用者。
如果您的所有消息都被两个消费者使用,那么您可能没有为他们设置相同的组id。
更多信息:
所以您已经为所有消费者设置了相同的组id,很好。您所处的情况是,集群/代理认为某个消费者已经死亡,因此将负载重新平衡到另一个消费者。另一个将从上次提交的地方开始使用。
因此假设使用者C_A从分区P_1读取最多为100的偏移量,然后处理它们,然后提交'100',然后读取最多为200的偏移量,然后处理它们,但不能提交,因为代理认为C_A已死。
代理将分区P_1重新分配给使用者C_B,使用者C_B将从组的最后一次提交开始(组的最后一次提交为100),读取最多200,处理并提交200。
所以你的问题是如何避免消费者被认为是死了(我假设它不是死的)?
答案已经在您的问题中的黄色警告消息中:您可以告诉您的使用者在一次轮询中消耗更少的消息(max.poll.records),以减少两次轮询之间对代理的处理时间,和/或您可以增加max.poll.interval.ms,告诉代理在认为您的使用者已死亡之前等待更长时间...
我有一个关于2个代理上的3个分区的主题。(Kafka版本:0.8.1) 使用不同的用户guid(如:FC42B34DD7658503E040970A2C437358)作为分区密钥批量添加消息。(约10K条消息) 在加载消息时,我有一个正在运行的消费者(consumer1),它开始很好地处理消息。 然后我用相同的消费者组ID启动了另一个消费者(consumer2)。 我希望两个消费者都应该分配负载。
我试图为我们的服务器安装负载平衡器。如果使用http,它可以正常工作。但是当我切换到https时 我在浏览器控制台中遇到以下错误: 混合内容:页面位于'https://www.something.com/'通过HTTPS加载,但请求了一个不安全的脚本'…mootools.js'。此请求已被阻止;内容必须通过HTTPS提供 我想我做了一些硬代码,比如“http://www.something.com
我有两条溪流。一个是事件流,另一个是数据库更新流。我想用从DB更新流构建的信息丰富事件流。 事件流非常庞大,使用5个字段进行分区。这给了我很好的分配。DB流不那么喋喋不休,并且使用两个字段进行分区。我目前正在使用两个公共字段连接这两个流,并使用flapMap来丰富第一个流。flatMap运算符使用ValueState维护状态,状态由两个公共字段自动键入。 除了实现自定义逻辑来手动提取键并更新维护状
我是微服务的新手。(学习阶段)。我有一个问题。我们在云中部署微服务。(例如 AWS)。云已经提供了负载平衡和日志。我们还在Spring Boot中实现了负载平衡(功能区)和日志(Rabbit MQ和Zipkin)。这两种实现有什么区别?我们两者都需要吗?有些人可以回答这些问题吗? 提前感谢。
我试图在AWS EKS集群中托管以下(部署前端)Kubernetes部署,在部署部署并创建服务和入口之后,所有的东西都成功地部署和创建了,但是当我试图从外部访问负载平衡器DNS时,这个负载平衡器是不可访问的。有人能指出原因吗? **下面的代码(deployment-2048)正在工作,负载均衡器是可访问的,但在(部署前端)**的情况下是不可访问的。
web-service预期会有很多调用,而我希望在出现故障时使该服务成为冗余,因此我希望有两个实例同时运行以处理所有请求。 1)让两个级别的Web服务同时处理请求的最佳方法是什么?使用外部负载均衡器还是使用AKKA/AKKA-HTTP中的某种魔法(我不知道)? 2)我必须调整哪些主要参数来提高性能?