当前位置: 首页 > 知识库问答 >
问题:

groupByKey创建重新分区主题,即使没有键更改

晋鹤轩
2023-03-14

我试图在kafka streams(kafka 1.0.1)和spring cloud stream(2.0.0-build-snapshot)的帮助下实现一个简单的事件源服务。我的StreamListener方法只是读取与聚合状态变化相对应的Kstream事件,并将它们应用到聚合上,并将最新的状态保存在本地状态存储(kafka提供的状态存储)中。域事件消息也具有与聚合的uuid(字符串)相同的键。代码如下:

@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
    Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
    stream
        .groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
        .aggregate(
                Slot::new, 
                (s, domainEvent, slot) -> slot.handle(domainEvent),
                Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
                as(Repository.SNAPSHOTS_FOR_SLOTS)
                    .withKeySerde(Serdes.String()).withValueSerde(slotSerde)
        );
}

上面的代码生成了一个changelog主题(如预期的那样):slot-service-slots-changelog。尽管它还创建了一个重新分区主题:slot-service-slots-repartition。这两个主题似乎有完全相同的消息(键和值)。我的理解是,如果流上没有修改键的操作,就不需要重新分区。我是不是漏掉了什么?

Update:正如sobychacko所提供的解释,这可能不再需要,但是我尝试了如下所示的没有云流绑定的情况,它没有创建重新分区主题:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {

    @Bean
    KafkaTemplate<String, DomainEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    ProducerFactory<String,DomainEvent> producerFactory() {
        return new DefaultKafkaProducerFactory<>(config());
    }

    private Map<String, Object> config() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return config;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    StreamsConfig streamsConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
        return new StreamsConfig(config);
    }

    @Bean
    KTable<String, Slot> kTable(KStreamBuilder builder) {
        Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
        Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);

        return
                builder
                .stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
                .groupByKey(Serdes.String(), domainEventSerde)
                .aggregate(
                    Slot::new, 
                    (s, domainEvent, slot) -> slot.handle(domainEvent),
                    slotSerde,
                    Repository.SNAPSHOTS_FOR_SLOTS);
    }

    }
@Autowired
    public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
    }

    public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(
            domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent) 
        );
        slot.flushEvents();
    }

更新2:

下面是带有云流的生产者代码:

public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
        slot.flushEvents();
    }

共有1个答案

邹玮
2023-03-14

在我们执行入站反序列化的地方,在调用方法之前会发生一个map()操作(我假设在上面的示例中禁用了本机反序列化)。正如Matthias指出的,如果有map()操作,该操作将设置一个标志,并在随后的groupbykey()中创建一个重新分区主题。因此,当框架为您执行map操作作为入站消息转换的一部分时,在您的情况下可能会发生这种情况。如果您确实希望避免创建此重新分区主题,可以启用nativeDecoding,然后使用Kafka提供的Serde。这样,框架就不会调用map操作。问题是代码中使用的JSONSERDE不容易用作Spring Cloud Stream中的Serde属性,因为它需要类信息。在Spring云流的下一个版本中,我们准备对这种情况进行改进。同时,您可以提供一个自定义Serde。希望这能有所帮助。

 类似资料:
  • Kafka只提供一个分区内消息的总顺序,而不提供主题中不同分区之间的消息的总顺序。每分区排序与按键对数据进行分区的能力相结合,对于大多数应用程序来说已经足够了。但是,如果您需要消息的总顺序,这可以通过只有一个分区的主题来实现,尽管这意味着每个使用者组只有一个使用者进程。 下面是我的问题: > 这是否意味着如果我希望有多个消费者(来自同一组)阅读一个主题,我需要有多个分区? 分区是如何编号的?从0开

  • null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?

  • 我对cosmos DB的分区密钥感到困惑。我有一个数据库/容器,大约有4000条小记录。如果我使用分区键筛选器尝试sql语句,则RUs和持续时间会比不使用时长更大。 有人明白这一点吗? 在此示例中,容器的分区键是/partitionkey

  • 假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?

  • 问题内容: 我正在辩论应该如何学习。主要是,如果我从数据库中获取大量信息以将其加载到页面上,那么最好的方式是操纵页面而不需要重新加载。使用XML之类的东西是最好的选择,还是我应该使用的其他东西。我知道其中很多使用HTML Dom和XML Dom,但是我不想开始学习我不需要的东西。 问题答案: 您正在寻找的技术名称是AJAX,它代表异步Javascript和XML(尽管目前大多数AJAX使用JSON