我的Spring云流应用程序中有一个简单的Kafka生成器。当我的Spring应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送给Kafka生产者。 问题是,当对账开始将enet发送到其中时,我的Kafka制作人还没有准备好,导致以下情况: org . spring framework . messaging . messagedeliveryexceptio
我正在创建一个简单的Kafka Streaming应用程序。我的Producer正在为一个主题生成protobuf序列化消息,我在Kafka Streaming应用程序中使用该主题来处理消费者消息。我正在尝试使用在我的应用程序中。yml文件。我发现以下错误。 错误: 我的配置文件: 在日志中打印实际反序列化消息的处理方法: 请帮我解决这个问题。如何在Kafka Stream中使用protobuf反
我需要能够从一开始就消费一个主题的所有消息。基本上与这个StackOverflow查询相同,但是针对Kafka 0.9进行了更新。(0.9特定的StackOverflow答案似乎相对较少)。 Kafka高级消费者使用Java API从主题获取所有消息(相当于从头开始) 0.9有一个完全不同的API,我真的不知道从哪里开始。我可以使用提供的bash脚本从命令行执行此操作,但不知道如何前进。 您能否为
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
我正在读取一个 json 文件并尝试使用 kafka 生成它。这是我的代码: 读取 json 文件的代码是: 但当我执行程序时,问题1: 它给出了错误,但是当我检查cosumer shell时,我得到如下结果:对于JSON文件中的一行,我在shell中看到4个条目..问题2: [root @ sandbox bin]#[root @ sandbox bin]#。/Kafka-console-con
当我尝试使用kafka控制台工具(V 0.9.0.1,我认为这使用了旧的消费者API)从托管在ec2中的kafka服务器接收消息时,我遇到了以下异常。我如何克服这一点? #./Kafka-控制台-消费者. sh -动物园管理员zookeeper1.xx.com:2181-话题我的_话题-从头开始
我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头 我做错了什么?非常感谢您的帮助。
最近在使用 Kafka 时,我的应用程序需要从头开始访问主题中的所有消息。因此,在编写Kafka Consumer(使用Java API)时,我可以从头开始读取消息,但它只返回主题中的前500条消息。试图增加 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,整数.MAX_VALUE);props.put(ConsumerConfig.FETCH_M
我使用的是apachekafka最新版本2.3.0,我将kafka部署为多节点集群,并使用SSL进行代理间通信。设置部署到kubernetes 生成生成器日志的命令对我来说很好 但是,我无法使用日志: ./kafka控制台使用者。sh--引导服务器kafka-1.qa.*****。com:9902--主题测试--从头开始 服务器日志将错误显示为:
我需要编写外壳脚本来读取特定主题上的kafka消息,使用“bin/kafka-console-consumer.sh--主题快速启动-事件--from-开始--引导服务器localhost:9092”,只需要一定的时间,比如10秒。使用选项--time out 10s没有帮助,因为如果特定主题上的消息连续出现,该过程不会在10s之后停止。请建议如何做同样的事情。
如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。
我正在尝试编写一个python代码来使用来自Confluent Kafka主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消费过程处于无限循环中,如果循环读取所有消息,则寻找退出的决策点。 参见下面的示例代码 请建议决定是否阅读所有消息的最佳方法,以便我可以退出循环并关闭消费者。
我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多
我想创建一个带有Spring-Cloud-Streams的Kafka-Streams应用程序,该应用程序集成了2个不同的Kafka集群/设置。我尝试使用文档中提到的多绑定器配置来实现它,类似于以下示例:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples 给定如下简单函数
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用