我有一个Kafka监听器,它有两个注册的处理程序,每个处理程序监听相同主题但模式类型不同的消息。监听器使用< code>@SendTo注释将结果转发到另一个主题,EOS由< code>@Transactional启用。
@KafkaListener(
groupId = "groupId",
clientIdPrefix = "kafka-async-api-commands-listener",
topics = "cmd-topic",
containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<FooResponse> Foo(FooCommand command) {
FooResponse response = new FooResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<BarResponse> calculateBar(BarCommand command) {
BarResponse response = new BarResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
}
根据文档:
为了支持@SendTo,必须为侦听器容器工厂提供用于发送回复的KafkaTemboard(在其回复模板属性中)。
Kafka模板是一个参数化的类型,需要提供它将要产生的消息的键和值。我遇到了一个问题,想出一个模板来同时支持Foo响应
和Bar响应
作为消息类型。这似乎是必须的,因为容器工厂只接受一个模板。
由于两个模板都希望共享整个配置基础(属性、错误处理程序),我可以实例化一个<code>KafkaTemplate类型的模板
@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {
private final String bootstrapServers;
public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean("asyncApiResponseTemplate")
public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
}
private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {
private final String bootstrapServers;
private final KafkaTemplate<UUID, Object> kafkaTemplate;
public KafkaAsyncApiCommandsListenerContainerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
) {
super();
this.bootstrapServers = bootstrapServers;
this.kafkaTemplate = kafkaTemplate;
this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
this.setReplyTemplate(kafkaTemplate);
}
private Map<String, Object> consumerConfig() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
put(JsonDeserializer.TRUSTED_PACKAGES, "*");
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}};
}
}
我对我的方法不满意,无论是因为通过使用Object
作为值参数来绕过静态键入KafkaTemplate
的方式,还是因为未来可能出现的潜在配置不灵活。假设我想为<code>FooResponse</code>和<code>BarResponse</code>消息提供明显不同的配置模板。这种差异需要不同的生产者配置。使用一个包含全部Object
值类型的模板,不可能实现这一点。
有没有办法为侦听器容器提供多个回复模板,以便根据消息值类型动态选择?Spring Boot自动配置是否以某种方式尝试解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许只有通过实例化两个单独的侦听器容器(附加不同的回复模板)侦听同一主题才能满足这一要求?如果后一种方法是正确的,我如何确保跨多个侦听器容器正确传递消息(理想情况下使用精确一次的语义学)?
有没有办法向侦听器容器提供多个回复模板,以便根据消息值类型动态选择?
号码
Spring 引导自动配置是否以某种方式尝试解决此类用例?
号码
然而,当返回类型是<代码>消息时
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
泛型类型只有在使用接受键和/或值的发送方法时才有意义。
您可以创建Kafka模板
的子类并覆盖发送方法以使用适当的泛型类型调用委托模板,但这将毫无意义,除非您直接返回Fo响应
或Bar响应
,而不是将它们组装成消息
服务器部件: 客户部分:io.js 消息组件 信息形式——发布过程的开始
我是JComboBox的新手 我有4个JComboBox:专用、etudiant、annee和semestre。 每次更改所选项目并将结果添加到滚动窗格(groupe des matieres ouvertes)时,我都需要从其中的4个项目中获取所选项目
我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!
要运行Kafka,需要在文件。有两种设置我不理解。 有人可以解释侦听器和广告侦听器属性之间的区别吗? 留档说: 侦听器:套接字服务器侦听的地址。 和 advertised.listeners:主机名和端口代理将向生产者和消费者做广告。 我什么时候必须使用哪个设置?
我想知道如何在一个键事件中按下所有的键。例如,我想为Ctrl+F编写一个监听器,它可以切换全屏。如何检查在一个事件中是否同时按下了Ctrl和F?
我正在使用版本来使用来自主题的消息。在使用者配置中,自动提交设置为,而设置为。与服务器协商为10秒。 在收到消息后,我将它的一部分保存到数据库中。我的数据库有时会非常慢,这会导致kafka侦听器会话超时: 组MyGroup得自动偏移量提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员.这意味着对poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味