当前位置: 首页 > 知识库问答 >
问题:

Kafka溪流圆形

许博易
2023-03-14

我写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题有50个分区

我的Kafka流代码有选择键()DSL操作,我有200万条记录使用相同的KEY。在流配置中,我已经完成了

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);

因此,我能够使用完全相同的键使用不同的分区。如果我没有按预期使用轮循机制,我的所有消息都会转到同一分区。

直到现在一切都很好,但我意识到;当我使用RoundRobin分区类时,我的消息就像~40个分区。10个分区处于空闲状态。我想知道我缺少了什么?它应该使用50个分区,大约200万条记录,对吗?

      final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
            builder.stream("deviceIds");

        // k: appId::deviceId, v: device
        final KTable<String, Device> deviceTable = builder.table(
            "device",
            Consumed.with(Serdes.String(), deviceSerde)
        );
            // Some DSL operations
            .join(
                deviceTable,
                (exportedDevice, device) -> {
                    exportedDevice.setDevice(device);

                    return exportedDevice;
                },
                Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
            )
            .selectKey((deviceId, exportedDevice) -> exportedDevice.getDevice().getId())
            .to("bulk_consumer");

   props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
   props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
   props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
   props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
   props.put("num.stream.threads", 10);
   props.put("application.id", applicationId);

RoundRobinPartitioner.java

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

共有1个答案

卓麒
2023-03-14

您不能使用ProducerConfig.PARTITIONER_CLASS_CONFIG配置更改分区--这仅适用于普通生产者。

在Kafka Streams中,您需要实现接口Streams分区机,并将您的实现传递给相应的运算符,例如,到("主题",Produced.streamPartitioner(新的My分区机())

 类似资料:
  • 在Spring Boot应用程序中,我试图配置Kafka流。用简单的Kafka主题,一切都很好,但我无法得到工作SpringKafka流。 这是我的配置: 我想创建一个基于主题的流。应用一个简单的转换并将此流中的消息发送到test主题。 我向发送以下消息,其中是我自己的复杂类型,但是我现在不知道如何将它转换为中的,以便能够在中使用它。 请建议如何使其工作。

  • 我对流媒体有一个普遍的问题,但对于问题的范围,让我们限制自己使用Kafka Streams。让我们进一步缩小范围,将我们的问题局限于单词计数,或者可能是一般的计数。假设我有一个由某个键和一个值组成的流,键可以是一个字符串(假设我们可以有很多字符串,除了空字符串,由世界上的任何字符组成),值是一个整数,现在我们正在构建一个单词计数应用程序,如果词汇表中的单词总数是一万亿,我们不能将它们存储在本地缓存

  • 我正在使用Kafka Streams,我注意到它使我的kafka日志记录了很多日志消息,例如: 这真的很令人不安,因为我发现它会淹没日志,所以我看不到任何其他内容(也会消耗资源)。 为什么它发生在(一些)Kafka Streams内部主题上,而不是其他主题上? 我怎样才能禁用它?

  • 我有一个通用的Streams API问题,我想“高效地”解决。假设我有一个(可能非常大,可能无限)流。我想以某种方式对其进行预处理,例如,过滤掉一些项目,并对一些项目进行变异。让我们假设这个预处理是复杂的,时间和计算密集型的,所以我不想做两次。 接下来,我想对项序列执行两组不同的操作,并使用不同的流类型构造处理每个不同序列的远端。对于无限流,这将是一个forEach,对于有限流,它可能是一个收集器

  • 我想知道我是否能做这样的事情。假设我有一个数字流1-20。我想利用一个特性,比如drop 3(我想用Java术语来说是限制还是跳过?)并产生一个流,即数字流: 1-20、4-20、7-20等 然后可能平坦地将这些全部映射到一条溪流中。我尝试了使用Stream.iterate的各种组合,主要是从流生成流,但我一直收到一个IllegalStateExcema,说流已经操作或关闭。 例如,人们可能期望这