当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者不使用事务性语义(isolation.level=read_commited)

祁飞翰
2023-03-14

以下是我的Kafka消费者属性:-

Properties props = new Properties();
    props.put("bootstrap.servers", "10.2.200.15:9092");
    String consumeGroup = "cg3";
    props.put("group.id", consumeGroup);
    // Below is a key setting to turn off the auto commit.
    props.put("enable.auto.commit", "false");
    props.put("heartbeat.interval.ms", "2000");
    props.put("session.timeout.ms", "6001");
    // Control maximum data on each poll, make sure this value is bigger than the
    // maximum // single message size
    props.put("max.partition.fetch.bytes", "140");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("isolation.level","read_committed");

Kafka Producer在其属性中有一个事务id,在推送一些消息之后,它将事务作为一个整体提交。以下是Kafka制作人的属性:-

log.info(“初始化属性”);属性道具=新属性();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    props.put("linger.ms", 1000);
    props.put("acks", "all");
    // props.put("request.timeout.ms",30000);
    props.put("retries", 3);
    props.put("retry.backoff.ms", 1000);
    props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
    props.put("enable.idempotence", true);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");
public boolean send(ProducerRecordImpl record) {
    try {
        producer.beginTransaction();
        for (int i = 0; i < 10; i++) {


            Future<RecordMetadata> futureResult = producer
                    .send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
            /*
             * It will wait till the thread execution completes and return true.
             */
            //RecordMetadata ack = futureResult.get();
            //log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
        }
        producer.commitTransaction();
        log.info("Commited");


        return true;

我无法理解是否commit没有从生产者端正确地发生,从而导致Kafka消费者无法用事务性语义读取它,或者Kafka消费者端存在问题。

任何帮助都将不胜感激。

共有1个答案

祁鸿哲
2023-03-14

您需要首先调用Producer.initTransactions()。否则,您的生产者不会发布事务性消息。

摘自https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaproducer.html#inittransactions()

在配置中设置transactional.id时,需要在任何其他方法之前调用。该方法做了以下几个方面的工作:1。确保由具有相同transactional.id的生产者的先前实例启动的任何事务都已完成。如果前一个实例在处理事务时失败,它将被中止。如果最后一个事务已开始完成,但尚未完成,则此方法等待其完成。2.获取内部生产者id和纪元,在生产者发出的所有未来事务性消息中使用。

 类似资料:
  • 我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务

  • 我试图理解Kafka的事务性API。此链接定义原子读-进程-写周期如下: 首先,让我们考虑原子读-进程-写周期是什么意思。简而言之,它意味着如果应用程序在某个主题分区tp0的偏移量X处消耗消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1,使得B=F(A),那么只有当消息A和B被认为成功消耗并一起发布或根本不发布时,读-进程-写周期才是原子的。 它还说: 使用为至少一次交付语义配置的va

  • 我试图将WSO2配置为使用来自ActiveMQ的消息,并在处理过程中引发错误时requeue(通常是当对远程web服务的调用失败时)。 Activemq在Axis2.xml中配置,属性transport.jms.SessionTransact设置为true。 当远程URL的格式无效(例如使用错误的协议)时,JMS回滚/重新传递/[重定向到死信]特性可以正常工作。但是,如果我停止远程web服务器,或

  • 我有一个使用Spring Boot相关项目的项目。我想在项目中使用Kafka消费者和生产者的Transactional功能。我需要尽可能高效地在Kafka中生成大量消息。所以我需要一个多线程消费和生产来满足这个要求。如何使用Spring boot开发多线程消费者和生产者?

  • 问题内容: 我正在尝试使用pyodbc创建一个SQL Server数据库。 失败并显示此错误 多语句事务中不允许使用CREATE DATABASE语句 它失败,因为该方法启动了事务,并且无法在事务内运行。 那么还有其他方法可以使用python执行命令吗? 问题答案: 建立连接时,pyodbc的默认设置符合Python的DB- API规范。因此,当执行第一个SQL语句时,ODBC将开始有效的数据库事

  • 我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat