当前位置: 首页 > 知识库问答 >
问题:

Kafka__consumer_offsets主题具有过多的分区计数

颜阳炎
2023-03-14

我使用的是Kafka 0.8.2,而我的使用者却出现了一个错误:“OFFSET commit FAILL with...”。当查看主题“__consumer_offsets”时。我看到它有50个分区计数。正常吗?我只能通过删除所有的Kafka日志并重新启动我的Kafka服务器来解决这个问题。是否有一种方法,我可以删除这个主题时,它达到一定数量的分区,还是我提交的偏移量是错误的?

下面是我提交偏移的方式:

 public void commitOffsets(BlockingChannel channel, String topic, String    groupid, int partition, String clientName, int corrilationid, long offset)   throws Exception{

    if (commitTryCount > 100){
        throw new Exception("Offset commit failed with " + channel.host());
    }

    long now = System.currentTimeMillis();
    Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
    //for (int i = 0; i < this.totalPartitions; i++){
        TopicAndPartition topicPartition = new TopicAndPartition(topic, partition);
        offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now));
    //}     

    //initialize offset commit
    OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1);
    channel.send(commitRequest.underlying());
    OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
    if (commitResponse.hasError()){         
        for (Object partitionErrorCode: commitResponse.errors().values()){
            if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){
                //reduce the size of the metadata and retry
                offset--;
                commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
                commitTryCount++;
            } else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode()
                    || Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                //discover new coordinator and retry
                int newCorrilation = corrilationid;
                newCorrilation++;
                this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation);
                commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset);
                commitTryCount++;
            } else{
                //retry
                commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
                commitTryCount++;
            }//end of else              
        }//end of for
    }//end of if
}//end of method

共有1个答案

斜俊
2023-03-14

我把代码贴出来后才弄明白的。当提交成功时,我忘记将变量“committrycount”设置为0。我仍然想知道__consumer_offsets主题有50个分区是否正常?

 类似资料:
  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我们使用的是 Kafka 2.5.1 版本集群。最近注意到其中一个主题分区数据大小不均匀。与其余分区相比,一个特定分区的大小增加了 300%。这在群集中造成了不均衡的磁盘利用率。 已验证使用者滞后,看起来像其他分区一样正常 此外,我们使用默认分区程序和设置为默认值的“metadata.max.age.ms”配置,即 300000ms(5 分钟) 我们是如何使分区数据均匀分布的?

  • 我需要向现有的Kafka主题添加分区。我知道可以使用bin/kafka主题。sh脚本来实现这一点,但我更愿意通过融合的RESTAPI来实现这一点。 据我所见,api引用中没有记录在案的endpoint,但我想知道这里是否有其他人能够做到这一点。 编辑:由于在这里似乎不可能使用REST api,我想知道在容器化设置中向现有主题添加分区的最佳实践是什么。例如,如果有将客户ID映射到特定分区的自定义分区

  • 问题内容: 如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。 提及一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count- through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/ge

  • 我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?

  • 我想要任何关于Kafka如何维护消息序列的信息/解释,当消息被写入多个分区的主题时。例如,我有多个消息生成器,每个消息生成器按顺序生成消息,并用超过1个分区编写Kafka主题。在这种情况下,消费者组将如何工作来消费消息。