当前位置: 首页 > 知识库问答 >
问题:

如何强制使用者读取Kafka中的特定分区

漆雕和昶
2023-03-14

我有一个从Kafka制作人生成的URL流下载特定web内容的应用程序。我创建了一个有5个分区的主题,其中有5个kafka使用者。但是,网页下载的超时时间为60秒。在下载其中一个url时,服务器会假定消息丢失,并将数据重新发送给不同的使用者。

我已经试过书中提到的所有东西了

Kafka消费者配置/性能问题

https://github.com/spring-projects/spring-kafka/issues/202

但是我每次都得到不同的错误。

是否可以在kafka中将特定的消费者与分区联系起来?我使用kafka-python为我的应用程序

共有3个答案

糜鸿风
2023-03-14

也许我猜你的案子到底发生了什么。如果你的消费者从Kafka那里获取url,然后去下载内容,你说这样做大约需要60秒。因此,您的消费者会阻止下载,并且无法将心跳发送到kafka服务器。所以kafka服务器认为这个消费者已经停机了,所以它进行了组重新平衡,并将未受限制的消息重新发送给其他消费者。

所以你可以尝试两种解决方案:

>

  • 将configs会话超时\u ms设置为60000或更大。默认值为30秒,这对您来说是不够的。

    一个更好的解决方案是使用多线程来做。当你的消费者从Kafka获取消息,然后启动一个新的线程来下载内容时,它不会阻止consumer.poll,所以它可以很好地工作。

  • 孟增
    2023-03-14

    我从未使用过Python客户端,但Java客户端支持assign方法,您可以使用该方法代替subscribe来请求为主题分配特定的分区。当然,您失去了自动再平衡功能,必须手动处理这些用例。

    秦珂
    2023-03-14

    我错过了Kafkapython的文档。我们可以使用TopicPartition类为特定的使用者分配一个分区。

    http://kafka-python.readthedocs.io/en/master/

    >>> # manually assign the partition list for the consumer
    >>> from kafka import TopicPartition
    >>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
    >>> consumer.assign([TopicPartition('foobar', 2)])
    >>> msg = next(consumer)
    
     类似资料:
    • 我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。

    • 我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p

    • 当Kafka消费者从其分配的分区读取数据时,消费者提取线程是否使用任何特定的逻辑来从分区中获取数据?例如,读取器线程是否做了任何逻辑/努力来平等/一致地读取分配的分区?它是否从最滞后的分区获取更多记录?还是只是简单的循环式逻辑? 有关于消费逻辑的详细文档吗? 谢谢你。

    • 我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务

    • 问题内容: 在Java中,是否有任何方法可以从文件中读取特定行?例如,读取第32行或任何其他行号。 问题答案: 除非你以前对文件中的行有一定的了解,否则没有阅读前31行就无法直接访问第32行。 所有语言和所有现代文件系统都是如此。 如此有效地,你只需阅读第32行,直到找到第32行为止。

    • 问题内容: 我正在使用Apache POI库,但是我有一些不想读取的数据-因此,我需要该程序才能从特定的行开始读取文件。 我想要来自第10行之后的单元格和行的所有数据,直到文档为空。我尝试了以下代码。 但这只会为我提供第10行中所有单元格的数据。 我期待收到您的来信:-)。 问题答案: 您仅从此处的第11行获取数据: 请参阅Sheet.getRow(int rownum) 的文档 返回基于0的逻辑

    • 问题内容: 可以说我有一个文本文件:data.txt(包含2000行) 如何从500-1500,然后从1500-2000读取给定的特定行,并显示特定行的输出? 此代码将读取整个文件(2000行) 如何修改上述代码以读取特定行? 问题答案: 我建议使用java.io.LineNumberReader。它扩展了BufferedReader,您可以使用它来获取当前行号 您还可以使用Java 7 ,如果适

    • 我正在使用Apache POI库,但我有一些数据我不想被读取 - 所以我需要程序从特定行开始读取文件。 我需要第10行之后的单元格和行中的所有数据,直到文档为空。我已尝试使用以下代码。 但它只会给我第10行单元格的所有数据。 我期待着你的消息:-)。