在向kafka写入主题时,出现错误:偏移提交失败
:
2016-10-29 14:52:56.387 INFO [nioEventLoopGroup-3-1][org.apache.kafka.common.utils.AppInfoParser$AppInfo:82] - Kafka version : 0.9.0.1
2016-10-29 14:52:56.387 INFO [nioEventLoopGroup-3-1][org.apache.kafka.common.utils.AppInfoParser$AppInfo:83] - Kafka commitId : 23c69d62a0cabf06
2016-10-29 14:52:56.409 ERROR [nioEventLoopGroup-3-1][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$DefaultOffsetCommitCallback:489] - Offset commit failed.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
2016-10-29 14:52:56.519 WARN [kafka-producer-network-thread | producer-1][org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater:582] - Error while fetching metadata with correlation id 0 : {0085000=LEADER_NOT_AVAILABLE}
2016-10-29 14:52:56.612 WARN [pool-6-thread-1][org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater:582] - Error while fetching metadata with correlation id 1 : {0085000=LEADER_NOT_AVAILABLE}
当使用命令创建一个新主题时,它是确定的。
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
这是使用Java的生产者代码:
public void create() {
Properties props = new Properties();
props.clear();
String producerServer = PropertyReadHelper.properties.getProperty("kafka.producer.bootstrap.servers");
String zookeeperConnect = PropertyReadHelper.properties.getProperty("kafka.producer.zookeeper.connect");
String metaBrokerList = PropertyReadHelper.properties.getProperty("kafka.metadata.broker.list");
props.put("bootstrap.servers", producerServer);
props.put("zookeeper.connect", zookeeperConnect);//声明ZooKeeper
props.put("metadata.broker.list", metaBrokerList);//声明kafka broker
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 1000);
props.put("linger.ms", 10000);
props.put("buffer.memory", 10000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
}
哪里错了
您好,您必须保持您的Kafka副本和复制因子为您的代码相同。
对我来说,我保留了3个副本和3个复制因子。
查看日志,问题在于集群可能没有连接到节点,而节点是zookeeper中已知的给定主题的唯一副本。
您可以使用给定的命令检查它:kafka主题。sh--描述--zookeeper本地主机:2181--主题test1
或者使用kafkacat:kafkacat-L-blocalhost:9092
示例结果:
Metadata for all topics (from broker 1003: localhost:9092/1003):
1 brokers:
broker 1003 at localhost:9092
1 topics:
topic "topic1" with 1 partitions:
partition 0, leader -1, replicas: 1001, isrs: , Broker: Leader not available
若您有单节点集群,那个么代理id(1001)应该和topic1分区的前导相匹配
但正如您所看到的,唯一已知的topic1副本是1001,现在不可用,因此不可能在不同的节点上重新创建topic。
问题的来源可以是自动生成的代理ID(如果你没有指定broker.id
或者它被设置为-1
)。接收不同于以前的代理ID,不同于在zoowatch中标记的代理ID(这是分区删除可以帮助的原因-但它不是生产解决方案)。
解决方案可能是设置代理。将节点配置中的id值设置为固定值-根据文档,应该在produciton环境中执行此操作:broker。id=1
如果一切正常,你应该收到这样的东西:
Metadata for all topics (from broker 1: localhost:9092/1001):
1 brokers:
broker 1 at localhost:9092
1 topics:
topic "topic1" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
Kafka文档:https://kafka.apache.org/documentation/#prodconfig
我面临着类似的问题。问题是,当您启动Kafka代理时,有一个属性与之关联,“Kafka\u偏移量\主题\复制\因子”。如果使用单节点群集,请确保将此属性设置为值“1”。因为它的默认值是3。这个改变解决了我的问题。(您可以检查Kafka.properties文件中的值)注意:我使用的是confluent Kafka版本4.0.0的基本图像(confluent Inc/cp Kafka:4.0.0)
问题内容: 我有以下结构: 通过kafka-topics shell脚本创建了具有复制因子3和分区3的主题。 并使用组localConsumers。领导没事的时候工作正常。 消费者日志 但是,如果领导者倒下了-我在消费者中遇到了错误(systemctl stop kafka): 节点3不可用。好 消费者日志 使用者无法连接,直到领导者掉线或与另一个使用者组重新连接。 不明白为什么会这样?消费者应重
关于颜色组 颜色组是一个组织工具,可让您在“色板”面板中将相关色板编组在一起。此外,颜色组可以是用于颜色协调的容器,您可以使用“编辑颜色”/“重新着色图稿”对话框或“颜色参考”面板来创建它。颜色组只能包含实色,其中包括专色、印刷色或全局色。不能对渐变和图案进行分组。 可以使用“颜色参考”面板或“编辑颜色”/“重新着色图稿”对话框创建协调颜色组。通过使用其中的任一功能,可以选择一个颜色协调规则以基于
我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf
我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat
我已经在互联网上闲逛了很长一段时间,我想知道如何在不打开不同端口的情况下一起使用BufferedReader和DataInputStream。我尝试过流式图像,但由于缓冲读取器存储了额外的字节,它创建了一个损坏的图像。当我尝试使用datainputstream.read()时,我无法轻松地读取整行文本。我的问题有什么解决办法吗?