当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka 0.11中正确提交生产者并消费事务性消息?

楚浩然
2023-03-14

我正在尝试Kafka跨国制作人在Java。

就像

    producer.initTransactions();
    try {
        producer.beginTransaction();
        producer.send(rec, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
                System.out.println("The offset of the record we just sent is: " + metadata.offset());
        });
        producer.commitTransaction();
    }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
        producer.close();
    }catch(KafkaException e) {
        producer.abortTransaction();
    }catch (Exception x){}
    producer.close();

它没有抛出任何错误。并且发送也在Kafka中推送消息,它是可用的。

我可以看到经纪人的日志是这样的:

[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)

5分钟后,我找到了这个经纪人日志。[2017-10-30 19:36:44123]信息[Broker 1001上的组元数据管理器]:在0毫秒内删除了0个过期的偏移量。(kafka.coordinator.group.GroupMetadataManager)

在这种情况下,我只能看到事务被初始化,但没有提交或其他东西的进一步日志。

在制片人配置中,我添加了

transactional.id=<some random transaction ID>
enable.idempotence=true

如上所述,请注意,如果配置了TransactionalId,则必须启用enable.idempotence。默认值为空,这意味着不能使用事务。

我在Kafka文档中找到了一条语句Producer:发送OffsetCommitRequest以提交与该事务结束相关联的输入状态

这是否意味着我必须说出我要提交的偏移量

我不确定制片人发生了什么

这是我的producer调试日志:

1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY

我想是的,在提交交易之前,我遗漏了一些东西。在consumer中,如果我设置READ_COMMITTED,我也无法消费。如果不是的话,它会正常工作,甚至我也会收到我使用事务生成器生成的消息。

用于读取事务消息的消费者代码

configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

和消费者正在订阅主题主题和

我的消费者调试控制台日志是:

126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)

在这里,它连续重复相同的3行,我有13作为最高偏移值。在消费者中,我无法消费信息。

我有一个1节点集群的设置,我尝试了3个节点,也显示了相同的结果。

感谢您的帮助。

共有1个答案

叶德本
2023-03-14

终于对我来说,它开始起作用了。

我说不清到底是什么问题。但从我的观察来看,这是WINDOWS操作系统造成的。

如果我们的经纪人在windows机器上,它不像预期的那样工作。如果经纪人在Linux机器上,它工作正常。

Dumping 00000000000000000000.index
offset: 0 position: 0
Dumping 00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 .... transactionalId=TXN_ID:1 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510143189059,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:1 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189232,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:1 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189393,txnTimeoutMs=60000
Dumping 00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1510143189059
Index timestamp: 0, log timestamp: 1510143189059
Found out of order timestamp in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, Previously indexed timestamp: 0

在上面的日志中,您可以很容易地看到事务状态为进行中准备提交。但没有提到提交/事务的完成。

但在我的producer控制台日志中,它表示为事务已完成,因此肯定存在一些问题。

offset: 0 position: 0 .... transactionalId=TXN_ID:2 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510145904630,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:2 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904763,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:2 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904931,txnTimeoutMs=60000
offset: 3 position: 383 .... transactionalId=TXN_ID:2 ... ,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1510145904938,txnTimeoutMs=60000

但在这里我们可以很容易地发现,总共有4种不同的状态被提及。

持续准备提交完成提交。这实际上使事务完成。

我们也可以重复使用提交ID。

因此,正如我可以总结的那样,如果你想使用事务,现在就在Linux上而不是windows上使用Kafka代理。

 类似资料:
  • 我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 发布者创建reply_to队列并发布到路由密钥,其中包含一条消息,告诉消费者向队列发送响应(RPC协议),以及一个传回的相关id,以便所有未来的结果都与该唯一标识符相关联 Exchange向绑定到该路由密钥的所有队列发送消息。这里,有两个消费者的两个队列,每个都绑定到路由密钥“泵” 一段时间后,消费者回复回队列,然后确认消息,以便他们的唯一队列删除发送到其队列的消息。每个收到消息的消费者都会这样做