新的Kafka版本(0.11)只支持一次语义。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98-精确一次交付和事务消息传递
我有一个用java编写的kafka事务代码制作程序,如下所示。
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
我不太确定如何使用sendOffsetsToTransaction及其预期用例。AFAIK,消费者组是消费者端的多线程读取功能。
javadoc说
“将已消耗的偏移量列表发送给使用者组协调员,并将这些偏移量标记为当前事务的一部分。只有事务成功提交时,这些偏移量才会被视为已消耗。当您需要将已消耗和已生成的消息批处理在一起时,应该使用此方法,通常是在消耗-转换-生成模式中。”
生产部门如何维护消耗补偿的列表?这有什么意义?
这只与您正在使用的工作流相关,然后根据您使用的内容生成消息。此函数仅允许在下游生产成功时提交消耗的偏移量。如果您消费数据,以某种方式对其进行处理,然后生成结果,这将在整个消费/生产过程中实现事务性保证。
在没有事务的情况下,通常使用Consumer#commitSync()
或Consumer#commitSync()
来提交Consumer偏移量。但是,如果您在与制作人制作之前使用这些方法,那么在知道制作人是否成功发送之前,您就已经提交了偏移量。
因此,您可以在生产者上使用Producer#sendOffsetsToTransaction()
来提交偏移量,而不是向消费者提交偏移量。这会将偏移量发送给处理事务的事务管理器。只有当整个事务-消费和生产-成功时,它才会提交偏移量。
(注意:当您发送要提交的偏移量时,应在上次读取的偏移量中添加1,以便以后的读取从尚未读取的偏移量继续。无论您是与消费者还是生产者提交,这都是正确的。请参阅:KafkaProducer sendOffsetsToTransaction需要偏移量1才能成功提交当前偏移量)。
问题内容: 我在Android源代码中看到一个陌生的符号: 例如: 我对星号表示法不熟悉。有人可以解释吗? 问题答案: 是的简化版本 此表示法来自C。
我在看关于React的教程时,偶然发现了这样一句话: 现在,我承认我对JS总体上是非常陌生的,所以这可能只是对基础知识的无知,但是
问题内容: 我是Android Studio的新手,我想知道Android Studio中声明的目的。 问题答案: @Override是Java注释。它告诉编译器以下方法将覆盖其超类的方法。例如,假设您实现了一个Person类。 人员类具有equals()方法。equals方法已经在Person的超类Object中定义。因此,以上equals()的实现是对Persons的equals()的重新定义
问题内容: 我已经在Java7中阅读到了,我们现在可以编写以下有趣的语句: 问题是:在这种情况下究竟是什么意思? 问题答案: 仅出于可读性目的,Java 7中允许在数字文字中使用下划线字符。从javadocs: 在Java SE 7和更高版本中,数字文字中数字之间的任意位置都可以出现任何数量的下划线字符(_)。例如,通过此功能,您可以将数字文字中的数字组分开,从而可以提高代码的可读性
问题内容: PHP中的三点(…)是什么意思? 当我在服务器中安装Magento 2时,出现错误。检查该代码,发现有一个三点(…),这会产生错误。我在下面提到了代码 问题答案: 在被称为在PHP图示操作。 此功能使您可以捕获函数的可变数量的参数,并根据需要传递传入的“普通”参数。举个例子最容易看到: 函数声明中的参数列表中包含运算符,它的基本含义是“ …以及其他所有内容都应放入$ strings中”
问题内容: 假设我们有一个类名Home。是什么区别 Home.this 和 Home.class ?他们指的是什么? 问题答案: 这个 引用该类的当前实例。 此表达式的正式术语似乎是qualified this,如Java语言规范的15.8.4节所引用。 在一个简单的类中,说和将等效。此表达式仅在存在内部类且需要引用封闭类的情况下使用。 例如: 家庭类 将类的表示形式作为对象返回。 此表达式的正式