我正在使用Logstash Kafka输入插件从一个主题中读取消息。我以前能够启动新的消费者——属于新的消费者组,并且通过设置auto_offset_reset=earliest能够从主题的开始消费消息。
插件配置:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test_topic"]
group_id => "new_consumer"
client_id => "new_consumer"
consumer_threads => 1
auto_offset_reset => "earliest"
}
}
但是现在我注意到一个奇怪的行为。尽管这是属于新使用者组的新使用者,并且auto_offset_reset设置为“最早”,但我无法使用任何消息。
启用的调试日志遵循以下行为:它清楚地表明使用者没有以前的偏移量,突然提取分区偏移量,使用者使用它并设置其新偏移量(请注意:之前从主题中读取了36387条消息,因此在下面的日志中读取了数字)
[2016-12-22T16:45:13,454][INFO][org . Apache . Kafka . clients . consumer . internals . abstract coordinator]成功加入第1代新消费者群
[2016-12-22T16:45:13,455][INFO][org . Apache . Kafka . clients . consumer . internals . consumer coordinator]为组new_consumer设置新分配的分区[test_topic-0]
[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]组new_consumer获取分区的已提交偏移量:[test_topic-0]
[2016-12-22T16:45:13,544][DEBUG][org . Apache . Kafka . clients . consumer . internals . consumer coordinator]Group new _ consumer没有分区测试_topic-0的提交偏移量
[2016-12-22T16:45:13,544][调试][org.apache.kafka.clients.consumer.internals.Fetcher]将分区的偏移量test_topic-0重置为最早的偏移量。
[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient]在localhost:9092启动与节点0的连接。
[2016-12-22T16:45:13,657][调试][log stash . instrument . Collector]收集器:向观察者发送快照{:created_at=
[2016-12-22T16:45:13741][DEBUG][org.apache.kafka.common.metrics.metrics]添加了名为node-0.bytes-sent的传感器
[2016-12-22T16:45:13,741][调试][组织.apache.kafka.common.metrics.Metrics. 添加了名称为 node-0.bytes-的传感器
[2016-12-22T16:45:13,741][DEBUG][org . Apache . Kafka . common . metrics . metrics]添加了名为node-0.latency的传感器
[2016-12-22T16:45:13,742][调试][org.apache.kafka.clients.NetworkClient]节点0连接完成
[2016-12-22T16:45:13,901][DEBUG][org . Apache . Kafka . clients . consumer . internals . fetcher]获取了分区test_topic-0的偏移量36387
[2016-12-22T16:45:18050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]组newconsumer为分区test_topic-0提交的偏移量36387
[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]组new消费者提交的分区偏移36387test_topic-0
有人能告诉我为什么我们会看到这种行为吗?
旧消息是否已根据配置的保留期被删除?可能偏移量36387是最早的偏移量,所有较早的消息都已过期。默认保留期为7天。
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
我对Kafka相对来说是新的,我试图在主题上发送消息后产生消费者。 单个生产者在不同的分区上发送200个msg。 我多次运行消费者脚本。
我使用一个生产者在本地主机上运行的Kafka服务器中输入了一些消息。Zookeeper也在本地主机上。我使用了这里给出的。 但是,消费者似乎没有收到任何消息! 脚本可以提取所有这些消息,但代码不能。怎么了?消费者代码与该页面上给出的代码完全相同。 这是我发布消息的同一主题。以下是制作人的代码:
我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow