当前位置: 首页 > 知识库问答 >
问题:

使用Spring Kafka在一个事务中写入两个Kafka主题

杭镜
2023-03-14

然后,我对一个方法使用了@transactional(transactionManager=“chainedTX”)注释,该方法执行以下操作:

template1.send("topic1", "example payload");
template2.send("topic2", "example payload");

这不起作用。kafkatemplate是事务性的,但是当调用send()方法时,没有正在进行的事务,并且我得到一个IllegalStateException

我打算尝试kafkatemplate.executeinTransaction()方法,但Javadoc声明这只用于本地事务,因此它似乎不符合我的需要。

我的下一步是尝试直接使用Kafka的Producer API,看看这种模式是否有效,但如果有人能告诉我知道我在浪费时间,Kafka不支持事务性地写多个主题,我会很感激。

我确实在Confluent关于Kafka交易支持的博客中发现了这样的说法:

事务启用对多个Kafka主题和分区的原子写入...

第一个生产者的配置

@Configuration公共类ControlProducerConfig

@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return  new KafkaTransactionManager<>(factory());
}

@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

@Configuration
public class PayloadProducerConfig {


@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(factory());
}

@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}
@EnableTransactionManagement
@SpringBootApplication
public class App {

public static void main(String[] args) {
    SpringApplication.run(App.class, args);
}

@Bean("chainedTx")
public ChainedTransactionManager chained(
    @Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
    @Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {

    return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}

@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
    return new OnStart(postTwoMessages);
}

@Bean
public PostTwoMessages postTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {

    return new PostTwoMessages(controlTemplate, payloadTemplate);
}
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {

private PostTwoMessages postTwoMessages;

public OnStart(PostTwoMessages postTwoMessages) {
    this.postTwoMessages = postTwoMessages;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    postTwoMessages.run();
}

发布这两条消息

public class PostTwoMessages  {

private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;

public PostTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {

    this.controlTemplate = controlTemplate;
    this.payloadTemplate = payloadTemplate;
}

@Transactional(transactionManager = "chainedTx")
public void run() {
    UUID uuid = UUID.randomUUID();
    controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
    payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}

}

共有1个答案

高琛
2023-03-14

它应该起作用;是否有@enabletransactionmanagement

然而,交易不能跨越2个不同的生产者;您必须使用相同的模板进行两次发送。否则就是两个不同的交易。

编辑

@SpringBootApplication
public class So54865968Application {

    public static void main(String[] args) {
        SpringApplication.run(So54865968Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.runInTx();
            System.out.println("Committed 1");
            foo.runInLocalTx();
            System.out.println("Committed 2");
        };
    }

    @Bean
    public Foo foo(KafkaTemplate<String, Object> template) {
        return new Foo(template);
    }

    @Bean
    public Bar bar() {
        return new Bar();
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54865968-1", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54865968-2", 1, (short) 1);
    }

    public static class Foo {

        private final KafkaTemplate<String, Object> template;

        public Foo(KafkaTemplate<String, Object> template) {
            this.template = template;
        }

        @Transactional(transactionManager = "kafkaTransactionManager")
        public void runInTx() throws InterruptedException {
            this.template.send("so54865968-1", 42);
            this.template.send("so54865968-2", "texttest");
            System.out.println("Sent 2; waiting a few seconds to commit");
            Thread.sleep(5_000);
        }

        public void runInLocalTx() throws InterruptedException {
            this.template.executeInTransaction(t -> {
                t.send("so54865968-1", 43);
                t.send("so54865968-2", "texttest2");
                System.out.println("Sent 2; waiting a few seconds to commit");
                try {
                    Thread.sleep(5_000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return true;
            });
        }

    }

    public static class Bar {

        @KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
        public void haandler(byte[] bytes) {
            if (bytes.length == 4) {
                ByteBuffer bb = ByteBuffer.wrap(bytes);
                System.out.println("Received int " + bb.getInt());
            }
            else {
                System.out.println("Received string " + new String(bytes));
            }
        }

    }

}
spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

而且

public class CompositeSerializer implements Serializer<Object> {

    private final StringSerializer stringSerializer = new StringSerializer();

    private final IntegerSerializer intSerializer = new IntegerSerializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
                : stringSerializer.serialize(topic, (String) data);
    }

    @Override
    public void close() {
    }

}

而且

Received int 42
Received string texttest

两人都在5秒暂停后出现。

 类似资料:
  • 当前实施情况 我有一个Spring批量工作,写一个Kafka主题。我从数据库中读取记录,转换它们,然后写入Kafka主题。 对现有作业的新更改 我应该再写一个审计主题和主要主题。 对于从数据库中读取的每个记录,我正在将一条消息(例如类Abc类型)写入主主题,对于同一条记录,我假设将另一个实体类类型的消息写入审计主题。 问题陈述 目前,我正在使用不同的KakfaTemplate来写入这两个主题,但问

  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我有以下用例:将来自单个数据源的日志文件推送到Kafka主题(例如主题1)。有一个消费者将从中读取并转换为json格式并写回另一个主题(主题2)。另一个期望json中的数据将从主题2读取的消费者将进行一些其他修改并写回另一个主题(主题3)。 我的问题是,除了创建3个不同的主题之外,我能否创建一个主题,并让这些多个制作者写同一个主题?既然不能为生产者设置组id,我的消费者如何知道从哪个分区读取?我从

  • 本文向大家介绍Spring如何在一个事务中开启另一个事务,包括了Spring如何在一个事务中开启另一个事务的使用技巧和注意事项,需要的朋友参考一下 这篇文章主要介绍了Spring如何在一个事务中开启另一个事务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 spring使用@Transactional开启事务,而且该注解使用propagation属

  • 我想使用Kafka Connect通过CDC检测Postgres DB上一组表的更改,并将它们作为单个主题中的消息推送,其中键作为主表的逻辑键。 这将使使用者能够以正确的顺序使用数据更改,以便将其应用于目标数据库。 是否有源和汇连接器允许我实现这个目标? 我正在使用Debezium CDC源连接器进行Postgres。。。我可以将其配置为将所有表的所有消息路由到一个主题中。 但是,我无法找到能够使

  • 我对微服务最佳实践方法有点困惑。 以下场景: 来自mqtt设备的大量传入消息。一个rest api,客户可以在其中读取消息(大部分只是其中的一部分)。 我的想法是,创建一个用于将消息存储在数据库表中的微服务。还有第二个带有rest api的微服务来读取这些消息。我想这样做,因为缩放问题。(传入的存储部分比读取rest api需要更多的电力) 我读到“完美”的微服务应该是唯一一个在数据库中访问其数据