因此,根据我对Apache Kafka中事务的理解,read_committed消费者不会返回作为正在进行的事务一部分的消息。因此,我猜想,消费者可以选择将其偏移量提交给那些正在进行的事务消息(例如,读取非事务消息),或者可以选择在提交/中止遇到的事务之前不进一步推进。我只是假设(Kafka)允许跳过那些挂起的交易记录,但考虑到它的抵消可能已经很远了,那么消费者在提交时将如何读取它们呢?
更新
考虑主题可能包含来自非事务性生产者和事务性生产者的记录(aka messages)的混合。例如,从一个主题考虑这个分区:
非Transact-XMsg,from-transact-producer1-msg,from-transact-producer2-msg,非Transact-YMSG
从关于隔离.level
的文档中:
消息将始终按偏移量顺序返回。因此,在read_committed
模式中,consumer.poll()
将只返回直到最后一个稳定偏移量(LSO)
的消息,该偏移量小于第一个打开事务的偏移量。特别是,在属于正在进行的事务的消息之后出现的任何消息都将被保留,直到相关事务已经完成。因此,当存在飞行事务时,read_committed
使用者将无法读取高水印。
我已经开始让我的制作人向Kafka发送数据,也让我的消费者提取相同的数据。当我在ApacheNIFI中使用ConsumerKafka处理器(kafka版本1.0)时,我脑海中很少有与kafka consumer相关的查询。 Q.1)当我第一次启动ConsumeKafka处理器时,我如何从开始和当前消息中读取消息? 问题2)以及在Kafka消费者关闭的情况下,如何在最后一条消费信息之后阅读信息? 在
我在java中有一个函数,在这个函数中我试图获取未读的消息。例如,如果我在broker中有偏移量为0、1、2的消息,这些消息已经被使用者读取,并且如果我关闭我的使用者一个小时。那时我产生的信息偏移量为3,4,5。之后,当我的消费者启动时,它应该从偏移量3读取消息,而不是从0读取消息。但是,它要么读取所有的消息,要么读取启动Kafka Consumer后产生的消息。我想读那些未读或未提交的消息 我尝
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav
我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如
我正在读这篇: 自动提交提交偏移量最简单的方法是允许消费者为您执行。如果您配置启用。汽车commit=true,则每五秒钟消费者将提交客户端从poll()收到的最大偏移量。五秒钟的间隔是默认值,由设置“自动”控制。犯罪间隔ms.与消费者中的其他所有内容一样,自动提交由轮询循环驱动。无论何时进行轮询,使用者都会检查是否到了提交的时间,如果是,它将提交上次轮询中返回的偏移量。 也许问题是我的英语不好,