当前位置: 首页 > 知识库问答 >
问题:

对于事务性和非事务性Kafka生产者,具有不同配置的相同KafkaTemplate

苏培
2023-03-14

通过 allowNon 事务属性,可以使用相同的 KafkaTemplate 来创建事务性和非事务性生产者。这两种生产者将使用来自Kafka生产者工厂的相同配置。

在某些特定情况下,可能需要为这两种生产者使用不同的配置。

下面的例子展示了一个具体的用例

  • KafkaTemplate-非事务性生产者-

问题可以通过拥有2个工厂和/或2个模板来解决,但为了避免样板代码和复杂性,我们的想法是保持相同的模板。

目前有没有支持这种用例的方法?

如果没有,有个支持就太好了。通过tx前缀,可以覆盖事务性生产者的默认配置条目。

  • KafkaTemplate-非事务性生产者-

如果您对这些问题有任何反馈,我们将不胜感激。

共有2个答案

钱黎明
2023-03-14

同意你的观点,对于大多数用例来说,这已经足够复杂了。

在我们的用例中,一些层构建在KafkaTemplate之上,该模板是隐藏的,从用户的角度来看可以认为是内部的。解决方案可能是在执行操作时检查我们是否在事务中。这是KafkaTemplate#inTransaction已经给出的内容

为了避免这种情况,可以在DefaultProducerFactory中进行轻微的内部返工,如下所示

// private -> protected
// new rawConfigs parameters
protected CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
        BiPredicate<CloseSafeProducer<K, V>, Duration> remover, Map<String, Object> rawConfigs) {
    // same as now
}

这样做将提供一种在需要时自定义配置的方法。

出于一致性原因,也可以对创建KafkaProducer(已受保护,只是一个新的参数)方法执行相同的操作。

如果你同意的话,我会非常乐意打开一个功能请求并为其删除一个PR。如果没有,我们可以考虑回答的问题

再次感谢您的快速回复。

薛烈
2023-03-14

已经有一个允许重写生成器工厂配置的构造函数。

    /**
     * Create an instance using the supplied producer factory and autoFlush setting.
     * <p>
     * Set autoFlush to {@code true} if you have configured the producer's
     * {@code linger.ms} to a non-default value and wish send operations on this template
     * to occur immediately, regardless of that setting, or if you wish to block until the
     * broker has acknowledged receipt according to the producer's {@code acks} property.
     * If the configOverrides is not null or empty, a new
     * {@link ProducerFactory} will be created using
     * {@link org.springframework.kafka.core.ProducerFactory#copyWithConfigurationOverride(java.util.Map)}
     * The factory shall apply the overrides after the supplied factory's properties.
     * The {@link org.springframework.kafka.core.ProducerPostProcessor}s from the
     * original factory are copied over to keep instrumentation alive.
     * Registered {@link org.springframework.kafka.core.ProducerFactory.Listener}s are
     * also added to the new factory. If the factory implementation does not support
     * the copy operation, a generic copy of the ProducerFactory is created which will
     * be of type
     * DefaultKafkaProducerFactory.
     * @param producerFactory the producer factory.
     * @param autoFlush true to flush after each send.
     * @param configOverrides producer configuration properties to override.
     * @since 2.5
     * @see Producer#flush()
     */
    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
            @Nullable Map<String, Object> configOverrides) {
 类似资料:
  • 我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522

  • CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?

  • 我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息

  • 问题内容: 在persistence.xml JPA配置文件中,可以有如下一行: 或有时: 我的问题是: 和之间有什么区别? 我还注意到缺少事务类型的一些persistence.xml文件。这是正确的吗? 问题答案: 默认值 在JavaEE环境中,默认值为 JTA ;在JavaSE环境中,默认值为 RESOURCE_LOCAL 。 RESOURCE_LOCAL 与您一起负责()创建和跟踪 您必须使

  • 对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。

  • 我正在查看一些现有的代码,并想知道在下面的场景中使用Spring的@Transactional注释会发生什么?考虑以下示例: 下面的updateDataBaseItem()方法是常见的,可以从其他非事务性方法和上面的方法调用: