当前位置: 首页 > 知识库问答 >
问题:

交易Kafka制作人

欧阳绪
2023-03-14

我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。

我使用的是Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}

我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于RecordTooLargeException

https://docs.spring.io/spring-kafka/reference/html/#using-Kafka塔拉行动经理

@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

但是当我看到主题t_hello_world 9时,信息就出现了。我的期望是看到0条消息,因为我的生产者是事务性的。我怎样才能做到呢?

我得到以下日志

2020-04-30 18:04:36.036 ERROR 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java



Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

2020-04-30 18:04:36.037  WARN 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038  INFO 18688 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038  INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**

共有1个答案

梅安平
2023-03-14

将未提交的记录写入日志;当事务提交或回滚时,会向日志中写入一条额外的记录,其中包含事务的状态。

默认情况下,使用者可以查看所有记录,包括未提交的记录(但不包括特殊的提交/中止记录)。

对于控制台消费者,您需要将隔离级别设置为read_committed。查看帮助:

--isolation-level <String>           Set to read_committed in order to      
                                       filter out transactional messages    
                                       which are not committed. Set to      
                                       read_uncommitted to read all          
                                       messages. (default: read_uncommitted)
 类似资料:
  • 我试图用事务性生产者/消费者来准确地理解Kafka。 我遇到了下面的例子。但是,我还是很难准确地理解一次。这个代码正确吗? 制作人sendOffsetsToTransaction-此代码的作用是什么?这是否应该针对同一个目标主题? 什么是消费者之前的系统崩溃。commitSync();//将再次读取相同的消息并生成重复消息?

  • Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用

  • 交易机制 当你用一些以太币Ether创建了一个有效的帐户时,你可以使用两种机制来与以太坊进行交易。 通过以太坊ethereum客户端进行认证签名交易 离线交易签名认证 这两种机制都是Web3j所支持的。

  • 问题内容: 每当我要保留任何实体时,都会执行以下代码。看起来一切正常,但我不明白它是如何工作的! 上面的EntityManager是整个应用程序共享的单个实例。开始交易后;我只是说em.persist(entity)..hibernate如何知道它属于哪个事务! 假设我的应用程序上有10个并发用户,并且所有10个线程都在执行上述代码。因此,正在创建和提交10个独立的事务。但是我并没有将所有10个不

  • 我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。

  • 交易 为了与Infura节点进行交易,需要在发送它们之前离线创建交易和签名,因为Infura节点没有加密的以太坊密钥文件的访问权限,这是需要通过geth或者Parity管理命令来解锁帐户。 有关详细信息,请参阅以太坊交易中离线交易和签名部分和web3j如何使用管理APIs。