当前位置: 首页 > 知识库问答 >
问题:

librdkafka C API Kafka Consumer 无法正确读取所有消息

史经业
2023-03-14

我正在使用< code > librdkafka C API consumer(特别是使用< code > rd _ Kafka _ consumer _ poll 来读取,在此之前我确实调用了< code > rd _ Kafka _ poll _ set _ consumer )

我看到的问题是,在我的谷歌测试中,我做了以下操作

>

  • 给Kafka写3条信息

    初始化/启动kafka消费者(< code > rd _ Kafka _ consumer _ poll )

    recycle_cb中,我将每个分区偏移量设置为RD_KAFKA_offset_STORED,并将它们分配给句柄

    在这一点上,我相信它应该读取3条消息,但它只读取最后一条消息,但令人惊讶的是,每个分区的偏移量已经更新!

    我在这里使用 Kafka 消费者遗漏了什么吗?

    还有一个问题是,我最初认为存储的偏移量在 kafka 代理中,并且主题消费者组 id 分区组合存在唯一的偏移量。

    所以我认为不同的消费者群体阅读同一主题应该有不同的补偿。

    但是,情况并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。

    我怀疑这可能与抵消promise有关,但不确定在何处解决这一问题。

    有什么见解吗?

  • 共有1个答案

    杜烨伟
    2023-03-14

    要查看的配置:< code>auto.offset.reset

    来自 Kakfa 消费者文档:

    当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时该怎么办

    来自 librdkafka 文档:

    当偏移量存储中没有初始偏移量或所需偏移量超出范围时要采取的操作:“最小”、“最早”-自动将偏移量重置为最小偏移量、“最大”、“最近”-自动重置偏移量为最大偏移量,“错误”-触发错误,通过使用消息并检查“消息”-

    默认值为最新值。

    此外

    #define RD_KAFKA_OFFSET_STORED -1000
    

    所以,您试图将分区偏移设置为-1000,这显然不是一个有效的偏移。显然,librdkafka在这种情况下读取了最后一条消息(我并没有检查代码)。

     类似资料:
    • 我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:

    • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

    • 我正在获取O并且从未进行成功的Hibernate连接测试,在学习了本教程“JasperReports with hibernate-module1”和JasperReports with hibernate-module2之后,弹出一个错误,说“Could not parse mapping document from resource com/report/mappings/department

    • 我们正在尝试读取PDF并动态填充其中的值。根据传入的请求,我们运行一些规则,导出要使用的PDF,然后动态地向其填充值。我们使用的是ApachePDFBox版本2.0.11,由于某些原因,我们在使用特定的PDF模板时遇到了问题。我们无法读取此特定模板的某些字段,生成的PDF不完整。想知道是否与原始PDF本身有关。下面是我们用来读取字段并填充字段的代码片段。 当我们试图打印每个字段名时,我们发现超过3

    • 我正在尝试托管一个使用谷歌Kubernetes引擎的应用程序。我的docker映像在本地运行时可以工作,但当我将其放到Google Cloud上并使用kubernetes集群进行设置时,它以一种非常奇怪的方式失败了。 我能够连接到应用程序,并且它可以工作,直到我触发的调用。然后它试图读取我通过环境变量提供的文件,结果出现了问题。我得到了这个(被截断和编辑的)回溯: 有没有人能猜到这里出了什么问题?

    • 我想使用amqp设置一个消费者,以便从特定队列读取。一些谷歌指出,这可以通过amqp_basic_get来实现,查看文档,实际的消息是通过amqp_read_消息检索的。我还发现了这个例子,我试图按照这个例子来实现基本的。然而,我无法从特定队列获取和读取消息。 我的场景是这样的:我有两个程序,通过发布和使用Rabbitmq服务器进行通信。在每个通道中,都声明了一个连接,有两个通道,一个用于消费,一