当前位置: 首页 > 知识库问答 >
问题:

Kafka事务读取promise使用者

严升
2023-03-14

我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。

事务性Kafka生产者的配置

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the maximum amount of time the client will wait "
         "for the response of a request. If the response is not received before the timeout "
         "elapses the client will resend the request if necessary or fail the request if "
         "retries are exhausted.";.*/
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
        /*To avoid duplicate msg*/
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        /*Will wait for ack from broker n all replicas*/
        props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
        return props;
    }

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        return new KafkaProducer<>(producerConfigs());
    }

普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加

使用者配置如下

@Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        //list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group");
        //automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //Auto commit is set false.Will do manual commit
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        /*Kafka Transactional Property ->Controls how to read messages written transactionally
         * read_committed - poll transactional messages which have been committed only
         * read_uncommitted - will return all messages, even transactional messages
         * default is read_uncommitted
         * */
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务性消息。但它是否在使用来自主题的事务性和非事务性消息。Do I缺少任何配置,以便使用者仅使用来自订阅主题事务性消息。提前致谢:-)

共有1个答案

史逸春
2023-03-14

不是那样的。isolation.level仅涉及事务生成器提交的记录。所有消费者都看到非事务性生产者发布的记录。

你需要使用两个不同的主题来获得你想要的行为。

 类似资料:
  • 我试图理解Kafka的事务性API。此链接定义原子读-进程-写周期如下: 首先,让我们考虑原子读-进程-写周期是什么意思。简而言之,它意味着如果应用程序在某个主题分区tp0的偏移量X处消耗消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1,使得B=F(A),那么只有当消息A和B被认为成功消耗并一起发布或根本不发布时,读-进程-写周期才是原子的。 它还说: 使用为至少一次交付语义配置的va

  • 据我所知,SQL事务在写查询的情况下特别有用,即当有多个涉及< code>write查询的并发事务时,如果另一个事务已经在它之前提交,一个事务将在< code>commit上得到错误,导致第一个事务可能没有更新的数据。以便它不会盲目地更新数据,也不会忘记另一个事务所做的修改。 我还了解数据库的“原子性”方面,一组操作将是原子的,在执行过程中,要么所有操作都必须成功执行,要么如果发生任何错误,则完全

  • 问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本

  • 我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat

  • 我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。 一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs

  • 我正在尝试使用spark阅读Kafka,但我想我会遇到一些图书馆相关的问题。 线程“main”org.apache.spark.sql.AnalysisException中出现异常:找不到数据源:Kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookup