TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。
例如:
我找到了ranged
或roundrobin
分配程序的partition.assignment.strategy
配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。
我开始深入研究KafkaConsumer源代码,#poll()将我引向#pollforfetches()#pollforfetches(),然后将我引向fetcher#fetchedRecords()和fetcher#sendfetches()
这就导致我试图沿着整个Fetcher类一起来学习,也许现在已经晚了,也许我没有深入研究,但是我很难弄清楚使用者将如何处理多个分配的分区。
背景
在Kafka流支持的数据管道上工作。
在这个流水线的几个阶段,当记录由不同的Kafka Streams应用程序处理时,流被连接到由外部数据源提供所需数据的压缩主题提要中,这些数据将在继续处理的下一阶段之前在记录中得到增强。
在这一过程中,有几个死信主题的记录无法与外部数据源匹配,而外部数据源本来会增加记录。这可能是因为数据还不是可用的(事件或活动还不是活动的),或者它是坏数据,永远不会匹配。
目标是在发布任何新的增强数据时,重新发布来自死信主题的记录,这样我们就可以匹配来自死信主题的先前不匹配的记录,以便更新它们并将它们发送到下游进行额外的处理。
记录在几次尝试中可能无法匹配,并且可能在死信主题中有多个副本,因此我们只想重新处理现有记录(在应用程序启动时的最新偏移量之前)以及自应用程序上次运行以来发送到死信主题的记录(在先前保存的使用者组偏移量之后)。
当我的使用者过滤掉应用程序启动后到达的任何记录时,它工作得很好,而我的生产者通过将偏移量提交为发布事务的一部分来管理我的使用者组偏移量。
但我想确保最终会从所有分区中消耗,因为我遇到了一个奇怪的边缘情况,未匹配的记录被重新处理,并与之前在dead letter主题中一样,落在同一个分区中,结果却被消耗者过滤掉。虽然它没有得到新的记录批处理,但也有一些分区还没有被重新处理。
如果能帮助理解单个使用者如何处理多个分配的分区,将非常感谢。
在fetcher
中,您的思路是正确的,因为大多数逻辑都在那里。
首先,正如消费者Javadoc所提到的:
如果为使用者分配了多个分区以从中获取数据,它将尝试同时从所有分区中使用,从而有效地为这些分区提供相同的优先级以供使用。
可以想象,在实践中,有几件事需要考虑。
>
每当使用者尝试提取新记录时,它将排除已有记录等待的分区(从上一次提取中)。已经有正在运行的fetch请求的分区也会被排除在外。
当提取记录时,使用者在提取请求中指定fetch.max.bytes
和max.partition.fetch.bytes
。代理使用这些数据来分别确定要返回的数据总量和每个分区的数据总量。这同样适用于所有分区。
使用这两种方法,在默认情况下,使用者尝试公平地使用所有分区。如果情况并非如此,更改fetch.max.bytes
或max.partition.fetch.bytes
通常会有所帮助。
如果您希望将某些分区优先于其他分区,则需要使用pause()
和resume()
手动控制消耗流。
null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者
我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?
我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。
谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但
我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下
我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。