当前位置: 首页 > 知识库问答 >
问题:

kafka代理中为kafka流应用程序创建的kafka内部主题太多

蔡明贤
2023-03-14

其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。

流配置如下:

final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,appName + "-Client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer);
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,state_dir);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,defaultReplication);
    return streamsConfiguration;

消费者方面的错误:

这背后的原因是什么?

共有1个答案

轩辕天佑
2023-03-14

这是一个众所周知的问题。参见低流量条件下的KAFKA-7190清除重新分区主题导致关于未知生产者ID的警告声明和KIP-360:改进未知生产者的处理。

 类似资料:
  • 我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。 Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。 通过研究和实验,我确定(对于Kafka版本1.0.0): 通配符不能

  • 我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的

  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 我一直在开发一个基于Java kafka-streams API的应用程序,其目标是处理来自一个kafka主题的数据流,并将其生成到另一个主题中。 看起来,每当我开始使用kafka-streams应用程序生成消息时,我正在使用的kafka代理上的文件句柄就会一直打开,而且从来没有关闭过,这意味着最终kafka服务器会出现太多打开的文件,kafka和zookeeper守护进程崩溃。 我使用API j

  • 我正在编程一个客户端工作与Kafka0.9。我想知道如何制造话题。这个答案:如何通过Java在Kafka中创建一个主题,与我所问的类似。除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同。