以下是我的Kafka消费者属性:-
Properties props = new Properties();
props.put("bootstrap.servers", "10.2.200.15:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the
// maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("isolation.level","read_committed");
Kafka Producer在其属性中有一个事务id,在推送一些消息之后,它将事务作为一个整体提交。以下是Kafka制作人的属性:-
log.info(“初始化属性”);属性道具=新属性();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put("linger.ms", 1000);
props.put("acks", "all");
// props.put("request.timeout.ms",30000);
props.put("retries", 3);
props.put("retry.backoff.ms", 1000);
props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
props.put("enable.idempotence", true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");
public boolean send(ProducerRecordImpl record) {
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
Future<RecordMetadata> futureResult = producer
.send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
/*
* It will wait till the thread execution completes and return true.
*/
//RecordMetadata ack = futureResult.get();
//log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
}
producer.commitTransaction();
log.info("Commited");
return true;
我无法理解是否commit没有从生产者端正确地发生,从而导致Kafka消费者无法用事务性语义读取它,或者Kafka消费者端存在问题。
任何帮助都将不胜感激。
您需要首先调用Producer.initTransactions()。否则,您的生产者不会发布事务性消息。
摘自https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaproducer.html#inittransactions()
在配置中设置transactional.id时,需要在任何其他方法之前调用。该方法做了以下几个方面的工作:1。确保由具有相同transactional.id的生产者的先前实例启动的任何事务都已完成。如果前一个实例在处理事务时失败,它将被中止。如果最后一个事务已开始完成,但尚未完成,则此方法等待其完成。2.获取内部生产者id和纪元,在生产者发出的所有未来事务性消息中使用。
我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务
我试图理解Kafka的事务性API。此链接定义原子读-进程-写周期如下: 首先,让我们考虑原子读-进程-写周期是什么意思。简而言之,它意味着如果应用程序在某个主题分区tp0的偏移量X处消耗消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1,使得B=F(A),那么只有当消息A和B被认为成功消耗并一起发布或根本不发布时,读-进程-写周期才是原子的。 它还说: 使用为至少一次交付语义配置的va
我试图将WSO2配置为使用来自ActiveMQ的消息,并在处理过程中引发错误时requeue(通常是当对远程web服务的调用失败时)。 Activemq在Axis2.xml中配置,属性transport.jms.SessionTransact设置为true。 当远程URL的格式无效(例如使用错误的协议)时,JMS回滚/重新传递/[重定向到死信]特性可以正常工作。但是,如果我停止远程web服务器,或
我有一个使用Spring Boot相关项目的项目。我想在项目中使用Kafka消费者和生产者的Transactional功能。我需要尽可能高效地在Kafka中生成大量消息。所以我需要一个多线程消费和生产来满足这个要求。如何使用Spring boot开发多线程消费者和生产者?
问题内容: 我正在尝试使用pyodbc创建一个SQL Server数据库。 失败并显示此错误 多语句事务中不允许使用CREATE DATABASE语句 它失败,因为该方法启动了事务,并且无法在事务内运行。 那么还有其他方法可以使用python执行命令吗? 问题答案: 建立连接时,pyodbc的默认设置符合Python的DB- API规范。因此,当执行第一个SQL语句时,ODBC将开始有效的数据库事
我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat