对我来说,Kafka事务制作人的行为就像一个普通的制作人,每次消息调用send时,在主题上都可以看到Meesage。也许我错过了一些基本的东西。我希望只有在调用producer commit方法之后,消息才会出现在主题中。在我下面的代码中。commitTransactions()已被注释掉,但我仍能收到主题中的消息。谢谢你的指点。
public static void main(String[] args) {
try {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions(); //initiate transactions
try {
producer.beginTransaction(); //begin transactions
for (Integer i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));
}
// producer.commitTransaction(); //commit
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
当涉及到交易在Kafka你需要考虑生产者/消费者对。正如您所观察到的,生产者本身只是产生数据,要么提交事务,要么不提交事务。
只有在与消费者交互时,您才能通过将Kafka消费者配置isolation.level
设置为read_committed
(默认情况下设置为read_uncommitted
)来完成事务。此配置描述为:
隔离级别:控制如何读取以事务方式写入的消息。如果设置为read_committed,则消费者。poll()将仅返回已提交的事务性消息。如果设置为read_uncommitted(默认值),则消费者。poll()将返回所有消息,甚至是已中止的事务性消息。非事务性消息将在任一模式下无条件返回。
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
我有一个消费转换产品应用程序,在Kafka中有一次精确的权杖。(事务性)生成阶段在同一主题上生成新消息,然后使用该消息(事务性=read_committed)。只有一个线程执行此操作,并确保消费者轮询在生产者的事务提交之后发生。现在我每轮只有一份民意调查报告。 当我运行我的测试用例时,有时可能会有其他生产者在我的生产者的事务提交之前发送的消息。然后我经历了以下情况: 我的单个poll语句只返回这个
这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。
我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht
我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: