通过 allowNon 事务属性,可以使用相同的 KafkaTemplate 来创建事务性和非事务性生产者。这两种生产者将使用来自Kafka生产者工厂的相同配置。
在某些特定情况下,可能需要为这两种生产者使用不同的配置。
下面的例子展示了一个具体的用例
问题可以通过拥有2个工厂和/或2个模板来解决,但为了避免样板代码和复杂性,我们的想法是保持相同的模板。
目前有没有支持这种用例的方法?
如果没有,有个支持就太好了。通过tx前缀,可以覆盖事务性生产者的默认配置条目。
如果您对这些问题有任何反馈,我们将不胜感激。
同意你的观点,对于大多数用例来说,这已经足够复杂了。
在我们的用例中,一些层构建在KafkaTemplate之上,该模板是隐藏的,从用户的角度来看可以认为是内部的。解决方案可能是在执行操作时检查我们是否在事务中。这是KafkaTemplate#inTransaction已经给出的内容
为了避免这种情况,可以在DefaultProducerFactory中进行轻微的内部返工,如下所示
// private -> protected
// new rawConfigs parameters
protected CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover, Map<String, Object> rawConfigs) {
// same as now
}
这样做将提供一种在需要时自定义配置的方法。
出于一致性原因,也可以对创建KafkaProducer(已受保护,只是一个新的参数)方法执行相同的操作。
如果你同意的话,我会非常乐意打开一个功能请求并为其删除一个PR。如果没有,我们可以考虑回答的问题
再次感谢您的快速回复。
已经有一个允许重写生成器工厂配置的构造函数。
/**
* Create an instance using the supplied producer factory and autoFlush setting.
* <p>
* Set autoFlush to {@code true} if you have configured the producer's
* {@code linger.ms} to a non-default value and wish send operations on this template
* to occur immediately, regardless of that setting, or if you wish to block until the
* broker has acknowledged receipt according to the producer's {@code acks} property.
* If the configOverrides is not null or empty, a new
* {@link ProducerFactory} will be created using
* {@link org.springframework.kafka.core.ProducerFactory#copyWithConfigurationOverride(java.util.Map)}
* The factory shall apply the overrides after the supplied factory's properties.
* The {@link org.springframework.kafka.core.ProducerPostProcessor}s from the
* original factory are copied over to keep instrumentation alive.
* Registered {@link org.springframework.kafka.core.ProducerFactory.Listener}s are
* also added to the new factory. If the factory implementation does not support
* the copy operation, a generic copy of the ProducerFactory is created which will
* be of type
* DefaultKafkaProducerFactory.
* @param producerFactory the producer factory.
* @param autoFlush true to flush after each send.
* @param configOverrides producer configuration properties to override.
* @since 2.5
* @see Producer#flush()
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
@Nullable Map<String, Object> configOverrides) {
我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522
CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
问题内容: 在persistence.xml JPA配置文件中,可以有如下一行: 或有时: 我的问题是: 和之间有什么区别? 我还注意到缺少事务类型的一些persistence.xml文件。这是正确的吗? 问题答案: 默认值 在JavaEE环境中,默认值为 JTA ;在JavaSE环境中,默认值为 RESOURCE_LOCAL 。 RESOURCE_LOCAL 与您一起负责()创建和跟踪 您必须使
对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。
我正在查看一些现有的代码,并想知道在下面的场景中使用Spring的@Transactional注释会发生什么?考虑以下示例: 下面的updateDataBaseItem()方法是常见的,可以从其他非事务性方法和上面的方法调用: