我正试图装配一个KafkaStorm“你好世界”系统。我有Kafka安装和运行,当我发送数据与Kafka生产者我可以读取它与Kafka控制台消费者。
我从O'Reilly的《Storm入门》一书中选取了第二章的例子,并将其修改为使用Kafka斯库特(KafkaSpout)而不是普通的喷口。
当我运行应用程序时,kafka中的数据已经挂起,KafkaSpout的下一个线程不会收到任何消息——它进入,尝试在协调器下的空管理器列表上迭代,然后退出。
我的环境是一个相当旧的Cloudera虚拟机,有Storm 0.9和Kafka-Storm-0.9(最新版本)以及Kafka 2.9.2-0.7.0。
这是我如何定义SpoutConfig和拓扑的:
String zookeepers = "localhost:2181";
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
"gtest",
"/kafka", // zookeeper root path for offset storing
"KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);
KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
有人能帮我弄清楚为什么我没有收到任何东西吗?
谢谢G
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
"gtest", // name of topic used by producer & consumer
"/kafka", // zookeeper root path for offset storing
"KafkaSpout");
您正在使用"gtest"主题接收数据。请确保您是通过生产者发送来自此主题的数据。
在螺栓中,像那样打印元组
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
}
它应该用Kafka打印待处理的数据。
SpoutConfig spoutConf = new SpoutConfig(...)
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
如果您已经使用了该消息,则不应该再读取该消息,除非您的生产者生成新消息。这是因为在代码中使用-1
调用forceStartOffsetTime。
表格暴风-配置留档:
喷口中另一个非常有用的配置是强制喷口倒带到上一个偏移的能力。在喷口配置上执行forceStartOffsetTime,如下所示:
spoutConfig.forceStartOffsetTime(-2);
它将选择围绕该时间戳写入的最新偏移量开始使用。可以通过传入-1强制喷口始终从最新偏移开始,也可以通过传入-2强制喷口从最早偏移开始。
你看起来怎么样?有一个片段会很有用。你可以用-2替换-1,看看你是否收到了什么,如果你的制作人很好,那么你应该可以消费。
我正在尝试把阿帕奇Storm和Kafka整合在一起。连接似乎建立良好,但没有收到任何消息。但是这些消息似乎也被发送到了Kafka服务器,而Kafka服务器中相应主题的索引文件显示存在一些数据。有没有一种方法可以在Storm End上调试这个更多的..?我正在使用Storm的客户解码器来处理信息。Storm的实现是:
下面是关于如何设置messenger机器人的Facebook教程-使用ngrok设置我的webhook。本地测试一切顺利,但在向bot发送消息时仍然没有收到任何响应。 韩国https://ngrok.com/ facebook教程https://developers.facebook.com/docs/messenger-platform/getting-started/quick-start/
我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d
我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试
你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些
我们使用Debezium(MongoDB)和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4)。通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任何任务。 暂停和恢复连接器没有帮助。当我们停止所有工作人员,然后再次启动他们时,任务就会创建,一切都会按应有的方式运行。