当前位置: 首页 > 知识库问答 >
问题:

使用一组压缩主题中的所有记录的最简单的Spring Kafka@Kafkalistener配置是什么?

熊锐进
2023-03-14
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

由于我有几个主题,结果代码非常冗长:

@KafkaListener(topicPartitions = {
        @TopicPartition(
                topic = "${topic1}",
                partitions = "#{@finder.partitions('${topic1}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        @TopicPartition(
                topic = "${topic2}",
                partitions = "#{@finder.partitions('${topic2}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        // and many more @TopicPartitions...
        @TopicPartition(
                topic = "${topicN}",
                partitions = "#{@finder.partitions('${topicN}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        )
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

如何通过配置@kafkalistener注释的topicpartitions属性,使这种重复配置更加简洁?

共有1个答案

松元明
2023-03-14

@kafkalistener目前不可能-请在GitHub上打开一个新的特性问题。

我能想到的唯一工作是从容器工厂以编程方式创建一个侦听器容器,并创建一个侦听器适配器。如果你需要,我可以提供一个例子。

编辑

@SpringBootApplication
public class So64022266Application {

    public static void main(String[] args) {
        SpringApplication.run(So64022266Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
    }

    @Bean
    ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
            PartitionFinder finder,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            MyListener listener) throws Exception {

        MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
        ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
        container.getContainerProperties().setGroupId("someGroup");
        return container;
    }

    @Bean
    MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
            MyListener listener) throws NoSuchMethodException {

        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBean(listener);
        endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
        endpoint.setTopicPartitions(Arrays.stream(topics)
            .flatMap(topic -> finder.partitions(topic))
            .toArray(TopicPartitionOffset[]::new));
        endpoint.setMessageHandlerMethodFactory(methodFactory());
        return endpoint;
    }

    @Bean
    DefaultMessageHandlerMethodFactory methodFactory() {
        return new DefaultMessageHandlerMethodFactory();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ConcurrentMessageListenerContainer<String, String> container) {

        return args -> {
            System.out.println(container.getAssignedPartitions());
            template.send("so64022266-1", "key1", "foo");
            template.send("so64022266-2", "key2", "bar");
        };
    }

}

@Component
class MyListener {

    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
        System.out.println(key + ":" + payload);
    }

}

@Component
class PartitionFinder {

    private final ConsumerFactory<String, String> consumerFactory;

    public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public Stream<TopicPartitionOffset> partitions(String topic) {
        System.out.println("+" + topic + "+");
        try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
            return consumer.partitionsFor(topic).stream()
                    .map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
        }
    }

}
topics=so64022266-1, so64022266-2
 类似资料:
  • 我有@KafkaListener使用topicPattern与正则表达式,工作正常(foo。*),但现在我想将侦听器分配给所有匹配主题的所有分区。 https://docs.spring.io/spring-kafka/docs/2.6.1/reference/html/#tip-assign-all-parts并没有真正帮助我,因为我不知道主题名称。

  • 问题内容: 我的故事: 我想做的事情像将行记录到文件的最简单的log4j记录器一样简单。我发现了几个具有某些功能的示例,但没有一个真正有效的基本通用示例,而没有一个解释每一行如何工作的示例。 题: 有人可以提供吗? 先决条件: 我已经知道将文件放在哪里,并且已经配置了log4j并可以用于控制台日志记录。 现在,我想登录到文件,并在程序运行后从文件系统中查找文件。 需要添加到现有文件的行是所需的输出

  • 我有一个Kafka主题,通过设置“清理”启用了压缩。政策=紧凑。我的部分。字节属性设置为稍大一点的值(100Mb),这样我的代理运行良好。 如果我有一个Kafka流应用程序,它使用的主题是GlobalKTable,并且在主题中的单个分区键有多个记录,该应用程序将在GlobalKTable中只接收1个记录,还是在压缩开始之前有两个记录?

  • 问题内容: 我有一个ZIP文件目录(在Windows计算机上创建)。我可以使用手动解压缩它们,但是如何通过外壳解压缩当前文件夹中的所有ZIP文件呢? 使用Ubuntu Linux服务器。 问题答案: 根据以下链接,这可以在bash中运行: 解压缩\ *。zip

  • 我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从

  • 我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除