谢谢你的耐心。
而且,当处理消息发生错误时,使用者线程不会恢复。我们的消费者读取消息并将其插入MySQL。一旦网络出现故障,consumer无法连接到MySql,然后它阻塞并停止读取消息,直到我们重新启动它。
“要知道,分区的一个用例是对数据进行语义分区,而添加分区并不会改变现有数据的分区,因此如果用户依赖于该分区,这可能会干扰他们。也就是说,如果数据是由hash(key)%number_of_partitions分区的,那么该分区可能会通过添加分区而被洗牌,但Kafka不会尝试以任何方式自动重新分发数据。”
“不尝试自动重新分发数据”是什么意思?旧数据不变,新数据不发到添加的分区?
kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]
kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]
在添加新的经纪人时也会出现同样的问题。我们必须将新的代理主机名和端口添加到producer中的“metadata.broker.list”配置并重新启动它。
我们使用的是高级api,而kafka的版本是:
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.9.2</artifactId >
<version> 0.8.2.0</version >
</dependency>
生产者配置:
<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />
使用者配置:
<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />
生产者代码和消费者代码类似于:https://cwiki.apache.org/confluence/display/kafka/0.8.0+生产者+示例https://cwiki.apache.org/confluence/display/kafka/consumer+group+示例
至于#2,假设您的键
是一个长
。假设你有10个分区。在分区之间分配long
的一种方法是简单地执行模数操作键%num_partitions
。但现在想想当您添加分区时会发生什么。根据num_partitions
的当前值,已经写入的消息将位于错误的分区中。这是说Kafaka不会自动为你重新划分任何东西。
假设答案是,是的,Kafka不会再平衡,那么有什么解决方案可以让Kafka在各种情况下平衡
有什么方法实现这一点吗? 我尝试创建了一个全新的主题,对于这个主题,它似乎使用了3个分区,这与文件中的更改一致。然而,对于已有的话题,它似乎并不在意。
问题内容: 我有一个带有2个分区的Kafka集群。我一直在寻找一种将分区数增加到3的方法。但是,我不想丢失该主题中的现有消息。我尝试停止Kafka,修改文件以将分区数增加到3,然后重新启动Kafka。但是,这似乎并没有改变任何东西。使用Kafka ,我仍然看到它仅使用2个分区。我正在使用的Kafka版本是0.8.2.2。在0.8.1版中,曾经有一个名为的脚本,我想可能可以解决问题。但是,我在0.8
关于Kafka,我有以下几个问题: > 如果我创建一个主题,并且指定的分区数多于代理数,那么单个代理将处理多个分区? 如果我创建了一个主题,并且指定的复制因子大于没有代理,那么该主题会创建还是不会创建? 一个代理可以处理不同主题的多个分区。
我有一个有几个消费者的消费群体。每个使用者被分配到一组分区。消费者何时轮询选择了已使用分区的消息?它是在消费者端完成的,还是Kafka服务器决定使用哪个分区? 我的一些分区有很多消息,但有些分区没有或几乎没有。但我仍然需要我的消费者平等地使用分配给它的每个分区。因此,我需要我的消费者快速遍历分区,最好从每个分配的分区轮询x条消息。 我在用https://github.com/appsignal/r
本文向大家介绍Kafka 分区数可以增加或减少吗?为什么?相关面试题,主要包含被问及Kafka 分区数可以增加或减少吗?为什么?时的应答技巧和注意事项,需要的朋友参考一下 我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,