当前位置: 首页 > 知识库问答 >
问题:

Kafka没有收到Kafka的任何东西

艾安和
2023-03-14

我正试图装配一个KafkaStorm“你好世界”系统。我有Kafka安装和运行,当我发送数据与Kafka生产者我可以读取它与Kafka控制台消费者。

我从O'Reilly的《Storm入门》一书中选取了第二章的例子,并将其修改为使用Kafka斯库特(KafkaSpout)而不是普通的喷口。

当我运行应用程序时,kafka中的数据已经挂起,KafkaSpout的下一个线程不会收到任何消息——它进入,尝试在协调器下的空管理器列表上迭代,然后退出。

我的环境是一个相当旧的Cloudera虚拟机,有Storm 0.9和Kafka-Storm-0.9(最新版本)以及Kafka 2.9.2-0.7.0。

这是我如何定义SpoutConfig和拓扑的:

String zookeepers = "localhost:2181";

SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest",
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);

KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);


//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
    .shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
    .fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());

有人能帮我弄清楚为什么我没有收到任何东西吗?

谢谢G

共有3个答案

盖夕
2023-03-14
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest", // name of topic used by producer & consumer
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");

您正在使用"gtest"主题接收数据。请确保您是通过生产者发送来自此主题的数据。

在螺栓中,像那样打印元组

public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple);
    }

它应该用Kafka打印待处理的数据。

夏骞尧
2023-03-14
SpoutConfig spoutConf = new SpoutConfig(...)
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
萧玮
2023-03-14

如果您已经使用了该消息,则不应该再读取该消息,除非您的生产者生成新消息。这是因为在代码中使用-1调用forceStartOffsetTime。

表格暴风-配置留档:

喷口中另一个非常有用的配置是强制喷口倒带到上一个偏移的能力。在喷口配置上执行forceStartOffsetTime,如下所示:

   spoutConfig.forceStartOffsetTime(-2);

它将选择围绕该时间戳写入的最新偏移量开始使用。可以通过传入-1强制喷口始终从最新偏移开始,也可以通过传入-2强制喷口从最早偏移开始。

你看起来怎么样?有一个片段会很有用。你可以用-2替换-1,看看你是否收到了什么,如果你的制作人很好,那么你应该可以消费。

 类似资料:
  • 我正在尝试把阿帕奇Storm和Kafka整合在一起。连接似乎建立良好,但没有收到任何消息。但是这些消息似乎也被发送到了Kafka服务器,而Kafka服务器中相应主题的索引文件显示存在一些数据。有没有一种方法可以在Storm End上调试这个更多的..?我正在使用Storm的客户解码器来处理信息。Storm的实现是:

  • 下面是关于如何设置messenger机器人的Facebook教程-使用ngrok设置我的webhook。本地测试一切顺利,但在向bot发送消息时仍然没有收到任何响应。 韩国https://ngrok.com/ facebook教程https://developers.facebook.com/docs/messenger-platform/getting-started/quick-start/

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

  • 你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些

  • 我们使用Debezium(MongoDB)和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4)。通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任何任务。 暂停和恢复连接器没有帮助。当我们停止所有工作人员,然后再次启动他们时,任务就会创建,一切都会按应有的方式运行。