当前位置: 首页 > 知识库问答 >
问题:

Apache Storm没有收到来自kafka的消息

罗寒
2023-03-14

我正在尝试把阿帕奇Storm和Kafka整合在一起。连接似乎建立良好,但没有收到任何消息。但是这些消息似乎也被发送到了Kafka服务器,而Kafka服务器中相应主题的索引文件显示存在一些数据。有没有一种方法可以在Storm End上调试这个更多的..?我正在使用Storm的客户解码器来处理信息。Storm的实现是:

TopologyBuilder builder = new TopologyBuilder();
Broker brokerForPartition0 = new Broker("xxxxx");
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);
StaticHosts hosts = new StaticHosts(partitionInfo);
SpoutConfig spoutConfig = new SpoutConfig(hosts, TOPIC, "/"+TOPIC, clientId);
spoutConfig.scheme = new MyLogScheme();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt());
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());

共有1个答案

贺文彬
2023-03-14

您正在使用自己的方案“MyLogScheme”。问题可能在MyLogScheme中。尝试使用默认方案,即StringScheme。希望这能让你看到一些消息。

 类似资料:
  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。

  • 我有一个名为“test-topic”的主题,有3个分区。 当我启动一个将group-id设置为“test-group”的使用者(consumer-1)时,它连接并读取主题上的所有分区。到目前为止还好。 当我在同一个组中启动另一个消费者(consumer-2)时,问题就出现了。我希望在两个消费者之间划分分区时能够重新平衡,例如,消费者-1得到分区0和2,消费者-2得到分区1。这种情况不会发生,当然我

  • 我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪

  • 我正试图装配一个KafkaStorm“你好世界”系统。我有Kafka安装和运行,当我发送数据与Kafka生产者我可以读取它与Kafka控制台消费者。 我从O'Reilly的《Storm入门》一书中选取了第二章的例子,并将其修改为使用Kafka斯库特(KafkaSpout)而不是普通的喷口。 当我运行应用程序时,kafka中的数据已经挂起,KafkaSpout的下一个线程不会收到任何消息——它进入,

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml