当前位置: 首页 > 知识库问答 >
问题:

普通生产者Kafka流的自定义分割器

章越
2023-03-14

我有一个kafka streams应用程序

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

这是一个类,用于将消息分发到不同的分区,即使在kafka 2.4版本中使用相同的键

RoundRobinPartitioner具有以下实现:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        int nextValue = nextValue(topic);

        return Utils.toPositive(nextValue) % numPartitions;

    }

当我这样配置时,消息在两种实现中都被分发到不同的分区,但决不使用某些分区。

我有50个分区,而分区14和34从未收到消息。我的分区不是没有价值的。它们是可用的。当我将返回分区方法更改为14或34时,所有消息都会转到该分区。会有什么问题?这两种实现都没有按预期工作。

编辑1:我试过普通制作人的RoundRobinPartitioner。结果是一样的。生产者不能在多个分区之间平等地生成消息,有些分区从未使用过。原因可能是什么?它不像是缺少的配置。

编辑2:我有调试RoundRobin分区,并把一个断点在返回。当我只产生一条消息时,生产者产生两次消息。第一次尝试总是不成功,并且消息不会进入任何分区。当我在调试时点击继续时,并发映射的索引会增加1。制作人的第二次尝试是成功的。

分区()方法被调用的东西,我还找不到。

编辑3:这可能与我没有覆盖的onNewBatch方法有关吗?

编辑4:这个实现适用于kafka客户端2.2,但不适用于2.4。分区接口没有onNewBatch方法。当键为空2.2 vs 2.4时,默认分区程序实现会更改。它能与棍子隔断有关吗?

共有1个答案

东方俊明
2023-03-14

使用UniformStickyPartitioner。在kafka 2.4客户端版本中初始化。圆机器人切割器。类适用于Kafka2.2或更低版本。在2.4版本中

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);

应该使用。我认为这与新的StickPartitioner有关。

 类似资料:
  • 我有一个主题中的多个事件,我试图在这些步骤中处理: 根据标题值过滤事件 应用反序列化程序 按键分组 聚合以生成新的KTable 新KTable将以流式传输方式传输到与具有新标题的新事件相同的主题 我可以使用transformValues访问标题,但不确定在执行toStream时如何注入新的标题值。 注意:我是KStream的新手。

  • 我有一个kafka集群,有三个代理和一个主题,复制因子为三个分区和三个分区。我可以看到,每个代理都有相同大小的所有分区的日志副本。这个主题有两位制作人。 有一天,我把一个制片人的写作量减少了一半。然后我发现所有三个代理的入站流量都减少了,这是意料之中的,但只有分区1的领导节点的出站流量减少了,我不明白。 由于复制,分区主管的出站流量减少。但是每个代理都是一个分区的领导者,为什么只有一个领导者的出站

  • 我正在使用Kafka生产者发布消息到一些其他Kafka的主题,它的工作相当好。下面的示例模板: 上述语句是否支持kafka消息驱动入站通道适配器中所支持的errorchannel? 每当我传出的kafka服务器宕机,我无法发布它时,我需要它来审计错误计数。

  • 我想在Kafka上用Flink设置一个基本的生产者-消费者,但是我很难通过Java向现有消费者生成数据。 CLI解决方案 > 我设置了一个使用zip from 和 我使用创建了一个名为transactions1的主题 现在我可以在命令行上使用生产者和消费者来查看主题已经创建并工作。 设置我运行的消费者 现在,如果任何制作人向主题发送数据,我将在消费者控制台中看到它。 我通过运行 并在cli中的生产

  • 我编写了一个spring kafka包,使用spring boot将消息发送到kafka主题,其中“Key”作为字符串,“Arraylist”作为值。“Custom Object”是一个具有属性item id、item name和item ordered count的类。 Kafka制作人日志如下所示。 我编写了一个自定义序列化程序,如下所示。 “Arraylist”的Serde类如下所示。 Ka

  • 我正在实现一个自定义消费者的主题/分区分配在Kafka。为此,我将重写抽象类,该类又实现接口。 作为自定义赋值器的一部分,我希望发送一个关于消费者订阅的每个主题的每个分区的单个(浮动)信息。 我知道可以通过重写接口的默认方法向赋值器发送自定义数据。 但是,问题是,从上面的方法签名中,我无法获得为使用者注册的每个主题分配给带下划线使用者的分区列表。 谢谢你。