当前位置: 首页 > 知识库问答 >
问题:

为Kafka在线添加分区或代理是否安全?

章晗日
2023-03-14

谢谢你的耐心。

  1. 在线向主题添加分区后,kafka使用者将停止读取消息,并且不会抛出异常。消费者只是阻拦。每次我们必须重新启动消费者。我认为这是不合理的,我找不到任何关于它的文件。

而且,当处理消息发生错误时,使用者线程不会恢复。我们的消费者读取消息并将其插入MySQL。一旦网络出现故障,consumer无法连接到MySql,然后它阻塞并停止读取消息,直到我们重新启动它。

“要知道,分区的一个用例是对数据进行语义分区,而添加分区并不会改变现有数据的分区,因此如果用户依赖于该分区,这可能会干扰他们。也就是说,如果数据是由hash(key)%number_of_partitions分区的,那么该分区可能会通过添加分区而被洗牌,但Kafka不会尝试以任何方式自动重新分发数据。”

“不尝试自动重新分发数据”是什么意思?旧数据不变,新数据不发到添加的分区?

kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
 at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
       at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
 at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
   at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
      at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]

kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]

在添加新的经纪人时也会出现同样的问题。我们必须将新的代理主机名和端口添加到producer中的“metadata.broker.list”配置并重新启动它。

我们使用的是高级api,而kafka的版本是:

<dependency>
      <groupId> org.apache.kafka</groupId >
      <artifactId> kafka_2.9.2</artifactId >
      <version> 0.8.2.0</version >
</dependency>

生产者配置:

<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />

使用者配置:

<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />

生产者代码和消费者代码类似于:https://cwiki.apache.org/confluence/display/kafka/0.8.0+生产者+示例https://cwiki.apache.org/confluence/display/kafka/consumer+group+示例

共有1个答案

东门胤
2023-03-14

至于#2,假设您的是一个。假设你有10个分区。在分区之间分配long的一种方法是简单地执行模数操作键%num_partitions。但现在想想当您添加分区时会发生什么。根据num_partitions的当前值,已经写入的消息将位于错误的分区中。这是说Kafaka不会自动为你重新划分任何东西。

 类似资料:
  • 假设答案是,是的,Kafka不会再平衡,那么有什么解决方案可以让Kafka在各种情况下平衡

  • 有什么方法实现这一点吗? 我尝试创建了一个全新的主题,对于这个主题,它似乎使用了3个分区,这与文件中的更改一致。然而,对于已有的话题,它似乎并不在意。

  • 问题内容: 我有一个带有2个分区的Kafka集群。我一直在寻找一种将分区数增加到3的方法。但是,我不想丢失该主题中的现有消息。我尝试停止Kafka,修改文件以将分区数增加到3,然后重新启动Kafka。但是,这似乎并没有改变任何东西。使用Kafka ,我仍然看到它仅使用2个分区。我正在使用的Kafka版本是0.8.2.2。在0.8.1版中,曾经有一个名为的脚本,我想可能可以解决问题。但是,我在0.8

  • 关于Kafka,我有以下几个问题: > 如果我创建一个主题,并且指定的分区数多于代理数,那么单个代理将处理多个分区? 如果我创建了一个主题,并且指定的复制因子大于没有代理,那么该主题会创建还是不会创建? 一个代理可以处理不同主题的多个分区。

  • 我有一个有几个消费者的消费群体。每个使用者被分配到一组分区。消费者何时轮询选择了已使用分区的消息?它是在消费者端完成的,还是Kafka服务器决定使用哪个分区? 我的一些分区有很多消息,但有些分区没有或几乎没有。但我仍然需要我的消费者平等地使用分配给它的每个分区。因此,我需要我的消费者快速遍历分区,最好从每个分配的分区轮询x条消息。 我在用https://github.com/appsignal/r

  • 本文向大家介绍Kafka 分区数可以增加或减少吗?为什么?相关面试题,主要包含被问及Kafka 分区数可以增加或减少吗?为什么?时的应答技巧和注意事项,需要的朋友参考一下 我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,