我需要编写外壳脚本来读取特定主题上的kafka消息,使用“bin/kafka-console-consumer.sh--主题快速启动-事件--from-开始--引导服务器localhost:9092”,只需要一定的时间,比如10秒。使用选项--time out 10s没有帮助,因为如果特定主题上的消息连续出现,该过程不会在10s之后停止。请建议如何做同样的事情。
你可以使用外部脚本,在期望的时间后杀死它。Kafka-console-consumer-timeot-ms flag代表空闲时间,而不是您正在寻找的时间。
或者,如果你知道你的消息速率/秒,你可以在--max消息之后停止,并评估10秒内你会收到多少消息。
我正在尝试用3个代理&Zookeeper来测试运行一个单独的Kafka节点。我希望使用控制台工具进行测试。我是这样管理制作人的: 然后我以这样的方式运行消费者: 我可以在生产者中输入消息,并在消费者中看到它们,这是预期的。但是,当我使用bootstrap-server运行消费者的更新版本时,我什么也得不到。例如: 当我有一个代理在端口9092上运行时,这工作得很好,所以我彻底搞糊涂了。有没有办法让
我已经在ec2上的一台机器上设置了一个kafka zookeeper和3个代理,端口为9092..9094,并且正在尝试使用另一台机器上的主题内容。端口2181(zk)、9092、9093和9094(服务器)对使用者计算机开放。我甚至可以做一个 主题:远程访问分区计数:1 ReplicationFactor:3 Configs:主题:远程访问分区:0 Leader:2 Replicas:2,0,1
我一直在尝试将kafka-avro-console-consumer从Confluent连接到我们的遗留Kafka集群,该集群是在没有Confluent模式注册表的情况下部署的。我使用以下属性显式提供了模式: 但我得到的是‘未知的魔法字节!’出错 是否可以使用Confluent kafka-avro-console-consumer(未使用Confluent的AvroSerializer序列化)和
我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol
我在上构建了一个排队系统。应用程序将为特定的生成消息,在使用者端,我必须使用为该主题生成的所有记录。 我使用新的Java使用者API编写了consumer。代码看起来像 这里我需要永远运行消费者,这样生产者推入kafka主题的任何记录都应该立即消费和处理。 所以我的困惑是,使用无限while循环(像示例代码中那样)消费数据是正确的方法吗?
任何立即的帮助都将不胜感激。 谢谢,米娜