我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本:
com.typesafe.akka:akka-stream-kafka_2.13:2.0.7
com.typesafe.akka:akka-stream_2.13:2.6.14
org.apache.kafka:kafka-clients:2.4.1 (*)
org.scala-lang.modules:scala-collection-compat_2.13:2.1.6
org.scala-lang:scala-library:2.13.5
重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。
以下是日志片段:
[2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka version: 2.4.1
[2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka commitId: c57222ae8cd7866b
[2021-04-14 21:20:43,869] [INFO] [org.apache.kafka.common.utils.AppInfoParser] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - Kafka startTimeMs: 1618424443866
[2021-04-14 21:20:43,879] [INFO] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Subscribed to topic(s): xyz-abc-import-dev-abc-input-topic
[2021-04-14 21:20:45,907] [INFO] [org.apache.kafka.clients.Metadata] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-19] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Cluster ID: L9OdIPABTGa7V9OPdViAaw
[2021-04-14 21:20:45,973] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Discovered group coordinator kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 2147483647 rack: null)
[2021-04-14 21:20:46,245] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] (Re-)joining group
[2021-04-14 21:20:47,554] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-9] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] (Re-)joining group
[2021-04-14 21:20:50,780] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-22] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Finished assignment for group at generation 5: {consumer-xyz-abc-import-1-995fd3d7-24b5-480d-90bc-b0967f0898f0=Assignment(partitions=[xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1])}
[2021-04-14 21:20:51,114] [INFO] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Successfully joined group with generation 5
[2021-04-14 21:20:51,125] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Adding newly assigned partitions: xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1
[2021-04-14 21:20:51,334] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Setting offset for partition xyz-abc-import-dev-abc-input-topic-0 to the committed offset FetchPosition{offset=38, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null), epoch=0}}
[2021-04-14 21:20:51,336] [INFO] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Setting offset for partition xyz-abc-import-dev-abc-input-topic-1 to the committed offset FetchPosition{offset=51, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null), epoch=0}}
还有一些要点:
KafkaConsumer<String, MyMsg> consumer = new KafkaConsumer<String, MyMsg>(props);
consumer.subscribe(Collections.singletonList(inputTopic), rebalanceListener);
ConsumerRecords<String, MyMsg> records = consumer.poll(Duration.of(60L, ChronoUnit.SECONDS));
这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1条)
final Source<ProducerMessage.Results<String, Object, ConsumerMessage.PartitionOffset>, NotUsed> stream = ...
stream.take(SUBMISSION_SIZE).runWith(Sink.ignore(), mat).toCompletableFuture().get();
不太确定如何调试它。8-(
[UPD 1]
它能以某种方式与交易相关吗?因为平原Akka流消费者可以看到消息并消费:
@Test
@Order(3)
void exploreTopic() throws IOException, ExecutionException, InterruptedException {
Consumer.DrainingControl<java.util.List<ConsumerRecord<String, Object>>> controlCompletionStagePair =
Consumer.plainSource(consumerSettings, Subscriptions.topics(inputTopic))
.take(SUBMISSION_SIZE)
.map(x -> {
System.out.println(x);
return x;
})
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(mat);
controlCompletionStagePair.streamCompletion().toCompletableFuture().get();
System.out.println("xxx");
}
[UPD 2]
我打开了日志记录的DEBUG级别,看到我的分区处于暂停状态:
[2021-04-14 23:46:05,122] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Fetch READ_COMMITTED at offset 40 for partition xyz-abc-import-dev-abc-input-topic-0 returned fetch data (error=NONE, highWaterMark=40, lastStableOffset = 40, logStartOffset = 40, preferredReadReplica = absent, abortedTransactions = [], recordsSizeInBytes=0)
[2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Added READ_COMMITTED fetch request for partition xyz-abc-import-dev-abc-input-topic-0 at position FetchPosition{offset=40, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null), epoch=0}} to node kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null)
[2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Added READ_COMMITTED fetch request for partition xyz-abc-import-dev-abc-input-topic-1 at position FetchPosition{offset=61, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null), epoch=0}} to node kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null)
[2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Built incremental fetch (sessionId=339828876, epoch=1) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
[2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Built incremental fetch (sessionId=23429972, epoch=1) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2021-04-14 23:46:05,725] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(xyz-abc-import-dev-abc-input-topic-0)) to broker kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 0 rack: null)
[2021-04-14 23:46:05,726] [DEBUG] [org.apache.kafka.clients.consumer.internals.Fetcher] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-6] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(xyz-abc-import-dev-abc-input-topic-1), toForget=(), implied=()) to broker kafka-staging-abc-cluster-kafka-1-kafka-abc.xyzabccluster-host:443 (id: 1 rack: null)
[2021-04-14 23:46:05,796] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-7] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:05,866] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-10] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:05,935] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-11] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,009] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-13] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,075] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-15] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,149] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-18] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,215] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-20] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,288] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-21] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,355] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-8] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,355] [DEBUG] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Sending Heartbeat request to coordinator kafka-staging-abc-cluster-kafka-0-kafka-abc.xyzabccluster-host:443 (id: 2147483647 rack: null)
[2021-04-14 23:46:06,427] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-9] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,459] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Node 1 sent an incremental fetch response for session 23429972 with 0 response partition(s), 1 implied partition(s)
[2021-04-14 23:46:06,459] [DEBUG] [org.apache.kafka.clients.FetchSessionHandler] [kafka-coordinator-heartbeat-thread | xyz-abc-import] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Node 0 sent an incremental fetch response for session 339828876 with 0 response partition(s), 1 implied partition(s)
[2021-04-14 23:46:06,500] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-12] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,557] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-14] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,559] [DEBUG] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-14] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Received successful Heartbeat response
[2021-04-14 23:46:06,625] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-16] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,697] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-17] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[2021-04-14 23:46:06,765] [DEBUG] [org.apache.kafka.clients.consumer.KafkaConsumer] [SchemaRegistrySerializationTest-akka.kafka.default-dispatcher-19] - [Consumer clientId=consumer-xyz-abc-import-1, groupId=xyz-abc-import] Pausing partitions [xyz-abc-import-dev-abc-input-topic-0, xyz-abc-import-dev-abc-input-topic-1]
[UPD 3]这与自动创建的主题有关吗?
很难说它是如何影响的,但在我将此属性添加到消费者后,消费者开始正常工作:
props.put(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, String.valueOf(Boolean.TRUE));
现在确定真正的原因是什么,以及为什么没有抛出错误,但添加上面的属性有帮助,至少在我的情况下是这样。
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml
D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......
这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决