当前位置: 首页 > 知识库问答 >
问题:

带有多个回复模板的Kafka侦听器容器

衡玄裳
2023-03-14

我有一个Kafka监听器,它有两个注册的处理程序,每个处理程序监听相同主题但模式类型不同的消息。监听器使用< code>@SendTo注释将结果转发到另一个主题,EOS由< code>@Transactional启用。

@KafkaListener(
        groupId = "groupId",
        clientIdPrefix = "kafka-async-api-commands-listener",
        topics = "cmd-topic",
        containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
        errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {

    @KafkaHandler
    @Transactional
    @SendTo("resp-topic")
    Message<FooResponse> Foo(FooCommand command) {
        FooResponse response = new FooResponse(command);
        return MessageBuilder
                .withPayload(response)
                .build();
    }

    @KafkaHandler
    @Transactional
    @SendTo("resp-topic")
    Message<BarResponse> calculateBar(BarCommand command) {
        BarResponse response = new BarResponse(command);
        return MessageBuilder
                .withPayload(response)
                .build();
    }
}

根据文档:

为了支持@SendTo,必须为侦听器容器工厂提供用于发送回复的KafkaTemboard(在其回复模板属性中)。

Kafka模板是一个参数化的类型,需要提供它将要产生的消息的键和值。我遇到了一个问题,想出一个模板来同时支持Foo响应Bar响应作为消息类型。这似乎是必须的,因为容器工厂只接受一个模板。

由于两个模板都希望共享整个配置基础(属性、错误处理程序),我可以实例化一个<code>KafkaTemplate类型的模板

@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {

    private final String bootstrapServers;

    public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    @Bean("asyncApiResponseTemplate")
    public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
        return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
    }

    private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {

    private final String bootstrapServers;
    private final KafkaTemplate<UUID, Object> kafkaTemplate;

    public KafkaAsyncApiCommandsListenerContainerFactory(
            @Value("${kafka.bootstrap-servers}") String bootstrapServers,
            @Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
    ) {
        super();
        this.bootstrapServers = bootstrapServers;
        this.kafkaTemplate = kafkaTemplate;
        this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
        this.setReplyTemplate(kafkaTemplate);
    }

    private Map<String, Object> consumerConfig() {
        return new HashMap<String, Object>() {{
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
            put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
            put(JsonDeserializer.TRUSTED_PACKAGES, "*");
            put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        }};
    }
}

我对我的方法不满意,无论是因为通过使用Object作为值参数来绕过静态键入KafkaTemplate的方式,还是因为未来可能出现的潜在配置不灵活。假设我想为<code>FooResponse</code>和<code>BarResponse</code>消息提供明显不同的配置模板。这种差异需要不同的生产者配置。使用一个包含全部Object值类型的模板,不可能实现这一点。

有没有办法为侦听器容器提供多个回复模板,以便根据消息值类型动态选择?Spring Boot自动配置是否以某种方式尝试解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许只有通过实例化两个单独的侦听器容器(附加不同的回复模板)侦听同一主题才能满足这一要求?如果后一种方法是正确的,我如何确保跨多个侦听器容器正确传递消息(理想情况下使用精确一次的语义学)?

共有1个答案

柴霖
2023-03-14

有没有办法向侦听器容器提供多个回复模板,以便根据消息值类型动态选择?

号码

Spring 引导自动配置是否以某种方式尝试解决此类用例?

号码

然而,当返回类型是<代码>消息时

    /**
     * Send a message with routing information in message headers. The message payload
     * may be converted before sending.
     * @param message the message to send.
     * @return a Future for the {@link SendResult}.
     * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
     * @see org.springframework.kafka.support.KafkaHeaders#PARTITION
     * @see org.springframework.kafka.support.KafkaHeaders#KEY
     */
    ListenableFuture<SendResult<K, V>> send(Message<?> message);

泛型类型只有在使用接受键和/或值的发送方法时才有意义。

您可以创建Kafka模板的子类并覆盖发送方法以使用适当的泛型类型调用委托模板,但这将毫无意义,除非您直接返回Fo响应Bar响应,而不是将它们组装成消息

 类似资料:
  • 服务器部件: 客户部分:io.js 消息组件 信息形式——发布过程的开始

  • 我是JComboBox的新手 我有4个JComboBox:专用、etudiant、annee和semestre。 每次更改所选项目并将结果添加到滚动窗格(groupe des matieres ouvertes)时,我都需要从其中的4个项目中获取所选项目

  • 我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!

  • 要运行Kafka,需要在文件。有两种设置我不理解。 有人可以解释侦听器和广告侦听器属性之间的区别吗? 留档说: 侦听器:套接字服务器侦听的地址。 和 advertised.listeners:主机名和端口代理将向生产者和消费者做广告。 我什么时候必须使用哪个设置?

  • 我想知道如何在一个键事件中按下所有的键。例如,我想为Ctrl+F编写一个监听器,它可以切换全屏。如何检查在一个事件中是否同时按下了Ctrl和F?

  • 我正在使用版本来使用来自主题的消息。在使用者配置中,自动提交设置为,而设置为。与服务器协商为10秒。 在收到消息后,我将它的一部分保存到数据库中。我的数据库有时会非常慢,这会导致kafka侦听器会话超时: 组MyGroup得自动偏移量提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员.这意味着对poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味