我试图理解Kafka的事务性API。此链接定义原子读-进程-写周期如下:
首先,让我们考虑原子读-进程-写周期是什么意思。简而言之,它意味着如果应用程序在某个主题分区tp0的偏移量X处消耗消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1,使得B=F(A),那么只有当消息A和B被认为成功消耗并一起发布或根本不发布时,读-进程-写周期才是原子的。
它还说:
使用为至少一次交付语义配置的vanilla Kafka生产者和消费者,流处理应用程序可能会以以下方式丢失一次处理语义:
>
我们可能会重新处理输入消息A,导致重复的B消息被写入输出,违反了恰好一次处理语义学。如果流处理应用程序在写入B之后但在将A标记为已消耗之前崩溃,则可能会发生重新处理。因此,当它恢复时,它将再次消耗A并再次写入B,从而导致重复。
最后,在分布式环境中,应用程序会崩溃,或者——更糟!——暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换被认为丢失的实例。通过这个过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,导致重复输出并违反恰好一次处理的语义学。我们称之为“僵尸实例”问题。
我们在Kafka中设计了事务API来解决第二个和第三个问题。事务通过使这些循环原子化并促进僵尸Geofence来实现读-进程-写循环中的精确一次处理。
疑虑:
>
上面的第2点和第3点描述了何时会发生消息重复,这些重复使用事务性API处理。事务性API是否也有助于避免在任何情况下丢失消息?
Kafka事务性API的大多数在线(例如,这里和这里)示例包括:
while (true)
{
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
这基本上是读-进程-写循环。那么事务性API只在读-进程-写循环中有用吗?
本文给出了非读-进程-写场景中事务API的示例:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
它说:
这允许生产者将一批消息发送到多个分区,以便该批消息中的所有消息最终对任何消费者可见,或者消费者永远看不到任何消息。
这个例子正确吗?它展示了另一种不同于读-写循环的事务性API的使用方式吗?(请注意,它也不会向事务提交偏移。)
在我的应用程序中,我只需使用来自Kafka的消息,进行处理并将其记录到数据库中。这就是我的全部管道。
a、 所以,我猜这不是读-写循环。Kafka事务API对我的场景有用吗?
b、 此外,我还需要确保每条消息只处理一次。我想在producer中设置idempotent=true
就足够了,我不需要事务API,对吧?
C。我可能会运行多个管道实例,但我不会将处理输出写入Kafka。所以我想这永远不会涉及僵尸(重复生产者写入kafka)。所以,我想事务性API不会帮助我避免重复处理场景,对吗?(我可能必须在同一数据库事务中将偏移量与处理输出一起保留到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)
a、 所以,我猜这不是读-写循环。Kafka事务API对我的场景有用吗?
这是一个读写过程,除非你是在写数据库而不是Kafka。Kafka有自己的事务管理器,因此在一个具有幂等性的事务中写入将启用一次处理,前提是您可以正确恢复消费写入处理器的状态。您不能对数据库执行此操作,因为数据库的事务管理器与Kafka的事务管理器不同步。相反,您可以做的是确保即使Kafka事务相对于您的数据库不是原子事务,它们最终仍然是一致的。
让我们假设您的消费者读取、写入数据库,然后进行ack。如果数据库失败,您不会进行ack,您可以根据偏移量正常恢复。如果ack失败,您将处理两次并保存到数据库两次。如果您可以使此操作幂等,那么您是安全的。这意味着您的处理器必须是纯的,数据库必须消重:处理相同的消息两次应该总是会在数据库上导致相同的结果。
b、 此外,我还需要确保每条消息只处理一次。我想在producer中设置幂等=true就足够了,我不需要事务API,对吗?
假设您尊重a点的需求,在另一个存储上使用持久性处理一次,也要求在初始写入和复制之间,您正在保存的对象没有发生其他更改。想象一下,将一个值写为X,然后其他参与者将其更改为Y,然后重新处理消息并将其更改回X。例如,可以通过将数据库表设置为日志来避免这种情况,类似于Kafka主题。
C。我可能会运行多个管道实例,但我不会将处理输出写入Kafka。所以我想这永远不会涉及僵尸(重复生产者写入kafka)。所以,我想事务性API不会帮助我避免重复处理场景,对吗?(我可能必须在同一数据库事务中将偏移量与处理输出一起保留到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)
是制作人向你消费的主题写信,这可能会产生僵尸信息。那个制片人需要和Kafka和睦相处,这样僵尸才会被忽略。事务性API和您的消费者将确保该生产者以原子方式写入,而您的消费者以原子方式读取提交的消息,尽管不是以原子方式。如果你想要一次幂等就足够了。如果消息应该以原子方式编写,那么您也需要事务。无论哪种方式,您的读写/消费处理器都需要是纯处理器,并且必须消除重复数据。你的数据库也是这个处理器的一部分,因为数据库是一个真正持续存在的数据库。
我在网上找了一些,也许这个链接可以帮你:处理保证
你发布的链接:Kafka的语义和交易非常棒。
我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务
我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat
以下是我的Kafka消费者属性:- Kafka Producer在其属性中有一个事务id,在推送一些消息之后,它将事务作为一个整体提交。以下是Kafka制作人的属性:- log.info(“初始化属性”);属性道具=新属性(); 我无法理解是否commit没有从生产者端正确地发生,从而导致Kafka消费者无法用事务性语义读取它,或者Kafka消费者端存在问题。 任何帮助都将不胜感激。
我们有一个Spring Boot应用程序,它使用来自IBM MQ的消息进行一些转换,并将结果发布到Kafka主题。我们使用https://spring.io/projects/spring-kafka为了这个。我知道Kafka不支持XA;然而,在文档中,我找到了一些关于使用ChainedKafkaTransactionManager链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明
我正在使用版本来使用来自主题的消息。在使用者配置中,自动提交设置为,而设置为。与服务器协商为10秒。 在收到消息后,我将它的一部分保存到数据库中。我的数据库有时会非常慢,这会导致kafka侦听器会话超时: 组MyGroup得自动偏移量提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员.这意味着对poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味
然后,我对一个方法使用了注释,该方法执行以下操作: 这不起作用。是事务性的,但是当调用方法时,没有正在进行的事务,并且我得到一个。 我打算尝试方法,但Javadoc声明这只用于本地事务,因此它似乎不符合我的需要。 我的下一步是尝试直接使用Kafka的Producer API,看看这种模式是否有效,但如果有人能告诉我知道我在浪费时间,Kafka不支持事务性地写多个主题,我会很感激。 我确实在Conf