当前位置: 首页 > 知识库问答 >
问题:

kafka使用者获取API不返回正确的偏移量值

颛孙成益
2023-03-14

我已经建立了一个实验性的Kafka环境,有3个代理和一个有3个分区的主题,我有一个生产者和一个消费者。我想为特定使用者修改分区的偏移量。我在kafka文档中读到,kafka中的使用者提交/获取API可以提交特定的偏移量或获取使用者读取的最新偏移量。以下是API的链接:

但是,我已经生成了一些消息,我的使用者已经使用了这些消息并输出每个已读消息的偏移量。

如果有人能帮上忙,我将不胜感激。我想知道我的代码的哪一部分是错的。或者可能API有问题。请不要犹豫,提出任何有用的意见。我的代码与我提供的链接中的代码完全相同。但是,如果你需要看我的代码,请告诉我把它放在这里。

Kafka版本是0.10.2.0

主题:“TestPIC3”

………………………………。

使用者配置:

props.put("group.id", "test");

props.put("client.id", "MyConsumer");
public class KafkaOffsetManage {

public static void main(String[] args) {


    BlockingChannel channel = new BlockingChannel("localhost", 9095,
            BlockingChannel.UseDefaultBufferSize(),
            BlockingChannel.UseDefaultBufferSize(),
            5000 /* read timeout in millis */);
    channel.connect();
    final String MY_GROUP = "test";
    final String MY_CLIENTID = "MyConsumer";
    int correlationId = 0;
    final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
    final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
    final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
    channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
    ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
    System.out.println("+++++++++++++++++++++++++++");

    System.out.println(metadataResponse.errorCode());

    if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
        Broker offsetManager = metadataResponse.coordinator();
        // if the coordinator is different, from the above channel's host then reconnect
        channel.disconnect();
        channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        System.out.println("Connected to Offset Manager");
        System.out.println(offsetManager.host() + ",  Port:"+ offsetManager.port());

    } else {
        // retry (after backoff)
    }



    // How to fetch offsets


    List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
    partitions.add(testPartition0);
    //partitions.add(testPartition1);
    OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
            MY_GROUP,
            partitions,
            (short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
            correlationId,
            MY_CLIENTID);
    try {
        channel.send(fetchRequest.underlying());
        OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
        OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);

        short offsetFetchErrorCode = result.error();
        if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
            channel.disconnect();
            // Go to step 1 and retry the offset fetch
        } else if (offsetFetchErrorCode  == ErrorMapping.OffsetsLoadInProgressCode()) {
            // retry the offset fetch (after backoff)
        } else {
            long retrievedOffset = result.offset();
            String retrievedMetadata = result.metadata();
            System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
            System.out.println(retrievedMetadata);
            System.out.println(result.toString());
        }
    }
    catch (Exception e) {
        channel.disconnect();
        // Go to step 1 and then retry offset fetch after backoff
    }
 }
}
+++++++++++++++++++++++++++
0

Connected to Offset Manager

user-virtual-machine,  Port:9093
------------------------
The retrieved offset is:-1

OffsetMetadataAndError[-1,,3]

Process finished with exit code 0
<artifactId>kafka_2.10</artifactId> 
<version>0.10.2.0</version> 
<artifactId>kafka_2.10</artifactId> 
<version>0.8.2.0</version>

谢谢,

共有1个答案

楚畅
2023-03-14

这种情况发生是因为偏移量过期。Kafka有两个参数控制着这种行为。首先是“__consumer_offsets”主题的“retention.ms”设置。它应该等于-1来禁用该主题内记录的过期。我假设使用Kafka版本1.1.x。使用命令检查主题配置:

$ ./kafka-configs.sh --entity-type topics \
                     --entity-name __consumer_offsets \
                     --zookeeper localhost:2181 \
                     --describe
Configs for topic '__consumer_offsets' are compression.type=producer,cleanup.policy=compact,min.insync.replicas=2,segment.bytes=104857600,retention.ms=-1,unclean.leader.election.enable=false

如果不满足配置设置,请使用命令进行更改:

$ ./kafka-configs.sh --entity-type topics \
                     --entity-name __consumer_offsets \
                     --zookeeper localhost:2181 \
                     --alter \
                     --add-config retention.ms=-1

假设设置了保留策略,接下来需要检查主题中是否有任何提交的消息。默认情况下,Kafka不允许阅读内部主题。要更改此行为,请创建一个具有使用者设置的文件:

$ echo exclude.internal.topics=false > consumer.properties
$ ./kafka-console-consumer.sh --consumer.config consumer.properties \
                              --from-beginning \
                              --topic __consumer_offsets \
                              --zookeeper localhost:2181 \
                              --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
[test_client,Test.Purposes,2]::[OffsetMetadata[13,NO_METADATA],CommitTime 1534165245681,ExpirationTime 1534251645681]
[test_client,Test.Purposes,0]::[OffsetMetadata[14,NO_METADATA],CommitTime 1534165245776,ExpirationTime 1534251645776]
[test_client,Test.Purposes,1]::[OffsetMetadata[8,NO_METADATA],CommitTime 1534165690946,ExpirationTime 1534252090946]
ExpirationTime = CommitTime + offsets.retention.minutes
$ date -d @1534165245
Mon Aug 13 16:00:45 UTC 2018

$ date -d @1534251645
Tue Aug 14 16:00:45 UTC 2018

正好是24小时。

因此,解决不正确偏移量问题的方法是增加“offsets.retention.minutes”设置,记住这会影响代理内存的使用,当系统中有许多死亡的使用者组时,还会定期提交未更改的偏移量以增加过期时间。

 类似资料:
  • 我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没

  • 我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow

  • 对于Java Kafka消费者函数,它要求传入和。但是,我认为这个seek方法将为我的使用者获取订阅的TopicPartitions的集合。 这是我试图处理的例子。 使用者A订阅主题“test-topic”分区1和2。当调用时,我从每个分区读取消息。我处理了一些消息,但我的应用程序得到了一个异常。我不调用。现在,我要倒回到在上次中检索到的偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。