依赖使用阿尔帕卡Kafka3.0
我们有以下消费者设置。
enable.auto.commit = true
自动偏移。reset=最早
如果我们有enable.auto.commit = true,那么是否有可能从特定的偏移量/日期消费来自Kafka主题分区的消息?
使用< code>assign方法,而不是< code>subscribe方法
public void sample() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 0);
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
}
}
据我所知,当您使用赋值方法而不是订阅方法时,没有必要在消费者属性中指定消费者组,因此可能会忽略这些参数(启用.auto.commit,auto.offset.reset)。使用 assign 方法并仅出于调试或测试目的进行搜索。
还有另一个叫做offsetsForTimes的方法来获得想要寻找的偏移量。
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。
我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息
2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试
有人能帮我弄清楚这件事吗。 谢了!
我创建了一个带有10个分区的Kafka主题,并尝试通过单个Kafka消费者来消费消息。但是,kafka consumer并不是从所有分区读取消息。更具体地说,它只使用来自5个特定分区的消息。示例:使用者仅使用来自[0,1,2,3,4]的消息。在重新启动之后,如果它开始使用来自[5,6,7,8,9]的消息,那么它将只使用来自这些分区的消息。下面是kafka-consumer-offset-check