当前位置: 首页 > 知识库问答 >
问题:

Kafka交易型生产者与消费者

水瀚漠
2023-03-14

我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了isolation.level=read_committed但我不能使用它...我可以看到消息被使用isolation.level=read_uncommitted但这不是希望的...

生产者代码

package com.org.kafkaPro;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import kafka.common.AuthorizationException;
import kafka.common.KafkaException;

    public class ProducerWithTx
    {


        public static void main(String args[]) throws FileNotFoundException {
            URL in = ProducerWithTx.class.getResource("producertx.properties");

            Properties props = new Properties();

            try {
                props.load(new FileReader(new File(in.getFile())));
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }


            Paymnt pay1= new Paymnt();
            pay1.setAccid(1);
            pay1.setAccountstate("y");
            pay1.setAccountzipcode(111);
            pay1.setBankid(12);
            pay1.setCreditcardtype(15);
            pay1.setCustomerid("2");
            SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd");
            Date t = null;
            try {
                t = ft.parse("2017-11-10");
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            pay1.setPeriodid(t);

            String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date());
            props.put("transactional.id", "accid=" + pay1.getAccid() +  " custid=" +pay1.getCustomerid()+  " timestmp=" +timeStamp);
            KafkaProducer<String, Paymnt> producer = new KafkaProducer(props);
            producer.initTransactions();  
            try{
                producer.beginTransaction();



                //RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get();

                producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1));
                producer.commitTransaction();

                //System.out.println("written to" +metadata.partition());

            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException  e){

                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch(KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            producer.close();
        }

    }

ProducerTX.Properties

metadata.broker.list=localhost:9092
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.org.kafkaPro.PaySerializer
#transactional.id=1
enable.idempotence=true
num.partitions=3

消费者

package com.org.kafkaPro;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Consumer {

    private static List<ConsumerMultiThreaded> consumersGroup;


    public static void main(String args[]) throws IOException {


        URL in = ProducerWithTx.class.getResource("consumer.properties");

        Properties props = new Properties();

        try {
            props.load(new FileReader(new File(in.getFile())));
        } catch (IOException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        consumersGroup=new ArrayList<ConsumerMultiThreaded>();
        ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props);

        consumersGroup.add(con1);
        consumersGroup.add(con2);
        consumersGroup.add(con3);
        consumersGroup.add(con4);

        for (ConsumerMultiThreaded consumer : consumersGroup) {

            Thread t=new Thread(consumer);
            t.start();

        }

        while(true){
            try {
                Thread.sleep(100000);
            } catch (InterruptedException ie) {

            }


        }
    }
}   
public class ConsumerMultiThreaded implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, Paymnt> consumer;
    private final int minBatchSize =3;
    private final List<ConsumerRecord<String, Paymnt>> buffer;


    public ConsumerMultiThreaded(Properties props){
        this.consumer= new KafkaConsumer<String, Paymnt>(props);
        buffer = new ArrayList(minBatchSize);
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("test"));
            while (!closed.get()) {
                ConsumerRecords<String,Paymnt> records = consumer.poll(10000);

                for (ConsumerRecord<String, Paymnt> record : records) {
                    buffer.add(record);
                }

                /*for (ConsumerRecord<String, Paymnt> record : records)
                {
                    System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString());
                }*/
                if(buffer.size()>=minBatchSize){
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, Paymnt> record : partitionRecords) {
                            System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString());
                        }
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                        buffer.clear();
                    }
                }
            }

        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } 
        finally {
            consumer.close();
        }

    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

}
bootstrap.servers=localhost:9092
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.org.kafkaPro.PayDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
group.id=test
isolation.level=read_committed

感谢你的帮助..谢谢

共有1个答案

端木淇
2023-03-14

在您的生产者属性中,您使用的是#transactional.id=1(如您在问题中提到的),在此中您提到了#符号。这可能会产生问题。

如果不是这样,那么您可以转储您的主题和__transaction_state主题的日志段,从那里您可以很容易地调试出错误。

 类似资料:
  • 我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,

  • Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认