我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了isolation.level=read_committed
但我不能使用它...我可以看到消息被使用isolation.level=read_uncommitted
但这不是希望的...
生产者代码
package com.org.kafkaPro;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
public class ProducerWithTx
{
public static void main(String args[]) throws FileNotFoundException {
URL in = ProducerWithTx.class.getResource("producertx.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Paymnt pay1= new Paymnt();
pay1.setAccid(1);
pay1.setAccountstate("y");
pay1.setAccountzipcode(111);
pay1.setBankid(12);
pay1.setCreditcardtype(15);
pay1.setCustomerid("2");
SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd");
Date t = null;
try {
t = ft.parse("2017-11-10");
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pay1.setPeriodid(t);
String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date());
props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp);
KafkaProducer<String, Paymnt> producer = new KafkaProducer(props);
producer.initTransactions();
try{
producer.beginTransaction();
//RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get();
producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1));
producer.commitTransaction();
//System.out.println("written to" +metadata.partition());
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch(KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
ProducerTX.Properties
metadata.broker.list=localhost:9092
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.org.kafkaPro.PaySerializer
#transactional.id=1
enable.idempotence=true
num.partitions=3
消费者
package com.org.kafkaPro;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Consumer {
private static List<ConsumerMultiThreaded> consumersGroup;
public static void main(String args[]) throws IOException {
URL in = ProducerWithTx.class.getResource("consumer.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
consumersGroup=new ArrayList<ConsumerMultiThreaded>();
ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props);
consumersGroup.add(con1);
consumersGroup.add(con2);
consumersGroup.add(con3);
consumersGroup.add(con4);
for (ConsumerMultiThreaded consumer : consumersGroup) {
Thread t=new Thread(consumer);
t.start();
}
while(true){
try {
Thread.sleep(100000);
} catch (InterruptedException ie) {
}
}
}
}
public class ConsumerMultiThreaded implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, Paymnt> consumer;
private final int minBatchSize =3;
private final List<ConsumerRecord<String, Paymnt>> buffer;
public ConsumerMultiThreaded(Properties props){
this.consumer= new KafkaConsumer<String, Paymnt>(props);
buffer = new ArrayList(minBatchSize);
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("test"));
while (!closed.get()) {
ConsumerRecords<String,Paymnt> records = consumer.poll(10000);
for (ConsumerRecord<String, Paymnt> record : records) {
buffer.add(record);
}
/*for (ConsumerRecord<String, Paymnt> record : records)
{
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString());
}*/
if(buffer.size()>=minBatchSize){
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, Paymnt> record : partitionRecords) {
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
buffer.clear();
}
}
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
}
finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
bootstrap.servers=localhost:9092
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.org.kafkaPro.PayDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
group.id=test
isolation.level=read_committed
感谢你的帮助..谢谢
在您的生产者属性中,您使用的是#transactional.id=1
(如您在问题中提到的),在此中您提到了#
符号。这可能会产生问题。
如果不是这样,那么您可以转储您的主题和__transaction_state主题的日志段,从那里您可以很容易地调试出错误。
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认