当前位置: 首页 > 知识库问答 >
问题:

为什么使用Kafka绑定器哈希键的Spring Cloud Stream与标准Kafka Producer不同?

晏昀
2023-03-14

我遇到了一个问题,我需要将一个现有的主题(< code>source)重新分区到一个新的主题(< code>target),这个新主题具有更大的分区数量(是以前分区数量的倍数)。

Source主题是使用Kafka Binder使用Spring Cloud Stream写入的。目标主题正在使用KStreams应用程序写入。

< code>source主题中的记录基于标题进行分区,其中< code>key=null。我试图显式提取这个头,并为< code>target主题中的记录设置一个消息键,并注意到具有相同分区键的记录位于完全不同的分区中。

经过一番调查,我发现罪魁祸首如下:

org.springframework.cloud.stream.binder.PartitionHandler.DefaultPartitionSelector

    private static class DefaultPartitionSelector implements PartitionSelectorStrategy {

        @Override
        public int selectPartition(Object key, int partitionCount) {
            int hashCode = key.hashCode();
            if (hashCode == Integer.MIN_VALUE) {
                hashCode = 0;
            }
            return Math.abs(hashCode);
        }
    }

< code > org . spring framework . cloud . stream . binder . partition handler

    public int determinePartition(Message<?> message) {
        // ... non relevant code omitted

        partition = this.partitionSelectorStrategy.selectPartition(key,
                    this.partitionCount);
        // protection in case a user selector returns a negative.
        return Math.abs(partition % this.partitionCount);

虽然默认的 Kafka 分区策略可以:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

本质上,使用Spring Cloud Stream之外的东西永远不允许与非Spring Cloud Stream应用程序编写的主题进行共同分区,除非使用自定义分区器(这并不难做到)。

但是,应该注意的是,上面的 DefaultPartitionSelector 并不位于 Kafka Binder 模块中,而是位于更高级别的 spring-cloud-stream 模块中。

这种设计选择的理由是什么?我认为默认的分区器适用于所有的绑定器,而不仅仅是Kafka,但是为什么Kafka绑定器不实现它自己的分区器,默认情况下它允许与非Spring Cloud Stream应用程序进行开箱即用的联合分区?

共有1个答案

牛越
2023-03-14

正如我在评论中所说

活页夹级别的分区适用于本机不支持分区的基础设施;不要使用它,让Kafka自己进行分区。

也就是说,你的意思并不完全清楚;Spring分区器是很久以前编写的,早于KIP 480引入的粘性缓存。但是,如果应用程序重新启动时分区数量发生变化,即使是那个分区器也会更改分区——如果有密钥,它会根据分区数量进行修改;如果没有密钥,则会选择一个随机(粘性)分区。

用10个分区,然后是20个分区运行这个,你会看到。

@SpringBootApplication
public class So73207602Application {

    public static void main(String[] args) {
        SpringApplication.run(So73207602Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template, NewTopic topic, KafkaAdmin admin) {
        return args -> {
            System.out.println(template.send("topic1", "foo", "bar").get().getRecordMetadata());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1").partitions(10).replicas(1).build();
    }

}

用一个< code>null键,你每次都会得到一个不同的(随机的)分区。

 类似资料:
  • 也许我没有看到什么或者我忘记了在计算运行时考虑的事情,所以请告诉我。

  • 我正在开发一款Android应用程序。在我的应用程序中,我集成了Facebook登录。我的facebook登录工作正常。但当我制作release apk并运行该应用程序并尝试登录Facebook时,它就不工作了。请看下面我的场景。 我生成如下的发布apk 然后我使用jks文件路径生成keyhash。 我得到了一个散列键,然后将其添加到开发人员配置文件设置中。 当我在我的设备上安装并运行apk并使用

  • 问题内容: 我正在阅读Java 1.6 API提供的HashMap类的代码,无法完全理解以下操作的需要(位于put和get方法的主体中): 该方法具有以下主体: 通过对提供的哈希码执行位操作,可以有效地重新计算哈希。即使API声明如下,我也无法理解这样做的必要性: 这很关键,因为HashMap使用2的幂的哈希表,否则哈希表在低位无差异时会遇到冲突。 我确实知道键值参数存储在数据结构数组中,并且该数

  • 问题内容: Python MD5哈希与shell上md5sum命令创建的哈希不同。为什么? 问题答案: 追加a,因为您通常不希望外壳中的行不以换行符结尾(如果提示不是从最左边开始,则看起来很丑)。 使用参数省略尾随的换行符,它将输出与您的python脚本相同的校验和:

  • 问题内容: 我偶然发现了一篇博客文章,详细介绍了如何在Python中实现powerset函数。因此,我尝试用自己的方式进行操作,并发现Python显然无法拥有一组集合,因为set无法哈希。这很烦人,因为功率集的定义是它是一组集合,而我想使用实际的集合操作来实现它。 Python集不可散列是否有充分的理由? 问题答案: 通常,在Python中只有不可变的对象才是可哈希的。的不可变的变体- -是哈希的

  • 问题内容: 某些哈希表方案(例如布谷鸟哈希或动态完美哈希)依赖于通用哈希函数的存在以及能够收集表现出冲突的数据并通过从通用哈希函数系列中选择一个新的哈希函数来解决这些冲突的能力。 。 不久前,我试图在以杜鹃哈希为后盾的Java中实现哈希表,并遇到了麻烦,因为尽管所有Java对象都有一个函数,但返回的值对于每个对象都是固定的(当然,除非对象更改)。这意味着如果没有用户提供外部家族的通用哈希函数,就不