当前位置: 首页 > 知识库问答 >
问题:

kafka -分区键放入错误分区的消息

马弘和
2023-03-14

我想知道,在什么情况下,具有相同分区键的消息会进入不同的分区。

我使用下面给出的命令运行了属于同一组的两个消费者在控制台中监听一个主题:

sudo /etc/kafka/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --
bootstrap-server localhost:9092 --topic demo3  --consumer.config 
config/consumer.properties --property print.key=true --property 
key.separator=:

我使用“纳米/Kafka-php”库将消息放入带有键 abc 的主题演示 3。当我发送多个这样的消息时,我发现很少有消息转到第二个消费者,而大多数消息都发送给消费者1。

由于我对所有消息使用相同的密钥abc,因此我希望所有消息都由同一个使用者使用。每个使用者都绑定到每个分区。

我使用以下代码来生成消息:

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('x.x.x.x:9092', 'y.y.y.y:9092');

$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
    return [
        [
            'topic' => 'demo3',
            'value' => 'test message.',
            'key' => 'abc',
        ],
    ];
}
);

$producer->success(function($result) {
 var_dump($result);
});
$producer->error(function($errorCode) {
    var_dump($errorCode);
});
$producer->send(true);

此屏幕截图清楚地显示,已向一个消费者发送了3条消息,另一条发送给另一个消费者

共有2个答案

安经纶
2023-03-14

根据 Java 文档,

如果指定了有效的分区号,则在发送记录时将使用该分区。

如果未指定分区但存在键,则将使用键的哈希选择分区。

如果键和分区都不存在,将以循环方式分配一个分区

通啸
2023-03-14

你说的是真的,你应该看到相同的消费者使用相同的密钥“abc”的消息。你能检查重新平衡是否开始,也许第一个消费者离开分区到另一个吗?或者,使用这个php kafka producer(我从未使用过)可以跟踪每个消息放在哪个分区。kafka生产者应该在发送消息时获得RecordMetadata信息,以便知道消息被分配到哪个分区和偏移量。因为是生产者决定目标分区,所以您应该确保php kafka生产者工作正常。

 类似资料:
  • 我对Kafka是新的,所以道歉,如果我听起来很愚蠢,但我目前所理解的是…消息流可以定义为主题,就像类别一样。并且每个主题被分成一个或多个分区(每个分区可以有多个副本)。所以它们是平行的 他们说Kafka的主要网站 生成器能够选择将哪个消息分配给主题中的哪个分区。这可以通过循环的方式简单地平衡负载,也可以根据某个语义分区函数(例如基于消息中的某个键)来完成。 在0.8 beta版中创建produce

  • 我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?

  • 我正在用单个主题和多个分区实现kafka producer。我通过消息中的一个特定值(消息json中的feedName属性值)选择消息到哪个分区。我正在为feedName-partitionId映射维护一个SQL表。我的问题是,leader和副本的分区Id是否相同?如果不同,如何在所有代理中唯一地标识分区?

  • 我正在努力解决如何正确使用分区键机制的问题。我的逻辑是将分区号设置为3,然后创建三个分区键作为“0”,“1”,“2”,然后使用分区键创建三个键控消息,例如 < li>KeyedMessage(主题,“0”,消息) < li>KeyedMessage(topic," 1 ",message) < li>KeyedMessage(主题,“2”,消息) 之后,创建一个生产者实例来发送所有KeyedMes

  • 根据Apache Kafka文档,消息的顺序可以在一个分区或一个主题中的一个分区内实现。在这种情况下,我们得到的并行性好处是什么,它相当于传统的MQ,不是吗?

  • 定义了一个自定义存储,用于自定义变压器(参考下面)。 https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java 我得到以下例外。不确定,为什么内部主题“test_01