当前位置: 首页 > 知识库问答 >
问题:

Storm ShellBolt“丢失”元组

盖博简
2023-03-14

我有一个拓扑结构如下所示:

注意,Bolt2和Bolt3从Bolt1和Bolt4接收元组。所有bolt都是运行python脚本的ShellBolts,而spout是运行从RabbitMQ读取的python脚本的ShellSpout。除了Bolt4之外,所有的工作都是预期的。如果我在RabbitMQ中一次添加一条消息,它就会全部工作并且干净利落地完成。如果我在Bolt4上挂起消息时对消息进行排队,则Bolt4将永远不会处理该消息。其他螺栓仍然执行它们的功能,但Bolt4将只是坐在那里完成第一个。

Storm UI显示所有的元组都由bolt4执行,但只有一个被加密。没有失败。我使用的是Storm0.9.5和Storm-Starter中包含的multilang python适配器。

编辑:这里有一个最小的失败案例。

螺栓1.py:

import storm
import time
import json


class Bolt1(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        self.emit("bolt3Queue", objID, **APIArgs)
        self.emit("bolt2Queue", objID, **APIArgs)
        self.emit("bolt4Queue", objID, **APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt1().run()

螺栓2.py:

import storm
import time
import json


class Bolt2(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt2().run()
import storm
import time
import json


class Bolt3(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt3().run()
import storm
import time
import json


class Bolt4(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        self.emit("bolt3Queue", objID, **APIArgs)
        self.emit("bolt2Queue", objID, **APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt4().run()
import storm
import random

class Spout(storm.Spout):

    def nextTuple(self):
        storm.emit(["id1234", "{}"], id=str(random.randint(1, 10000)))

if __name__ == '__main__':
    Spout().run()
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;


public class PyroTopology {
  public static class PythonBolt extends ShellBolt implements IRichBolt {

    public PythonBolt(String script) {
      super("python", script);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class Bolt4 extends ShellBolt implements IRichBolt {

    public Bolt4() {
      super("python", "bolt4.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class Bolt1 extends ShellBolt implements IRichBolt {

    public Bolt1() {
      super("python", "bolt1.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt4Queue", new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class PythonSpout extends ShellSpout implements IRichSpout {

    public PythonSpout() {
      super("python", "spout.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("rabbit", new PythonSpout(), 1);

    builder.setBolt("bolt1", new Bolt1(), 1).
            shuffleGrouping("rabbit");

    builder.setBolt("bolt4", new Bolt4(), 1).
            shuffleGrouping("bolt1", "bolt4Queue");

    builder.setBolt("bolt3", new PythonBolt("bolt3.py"), 1).
            shuffleGrouping("bolt1", "bolt3Queue").
            shuffleGrouping("bolt4", "bolt3Queue");

    builder.setBolt("bolt2", new PythonBolt("bolt2.py"), 1).
            shuffleGrouping("bolt1", "bolt2Queue").
            shuffleGrouping("bolt4", "bolt2Queue");

    Config conf = new Config();
    conf.setStatsSampleRate(1.0);
    conf.put(Config.TOPOLOGY_DEBUG, true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5);
    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(30000);

      cluster.shutdown();
    }
  }
}
storm jar target/storm-starter-0.9.5-jar-with-dependencies.jar storm.starter.PyroTopology vb

共有1个答案

戚正业
2023-03-14

您正在使用basicbolt,它会为您自动处理加密。因此,不能在代码中手动ack输入元组。这会导致单个元组的多个ack,从而混淆了Storm跟踪ack的机制(通过对消息ID和ack ID进行异或)。作为另一种选择(如果您需要高级的加密行为,您可以实现bolt

如UI所示,spout没有接收到ack,因此当达到最大spout挂起时,Storm会停止发出元组。此外,您会看到bolt的“executed”和“acked”的计数不匹配--这也表明没有正确处理acks。

 类似资料:
  • 早上好,我试图在我们的代码中找到一个错误,关于当xml没有被格式化时使用JAXB解组XML。我已经做了很多调试和测试,但仍然找不到错误。 xml的解释部分如下所示: 列表最多可包含50.000<代码> 为此,我们有以下代码。解组器的创建被移动到一个接口,以区别我们希望通过jaxb解组的元素,包括子元素(如元素)和那些(如

  • 我有storm topology(1个worker)设置,其中spout(java)从redis出列(使用blpop)事件并传输到bolts。但是有一个观察到的情况是,当队列超过200万,并且在storm Nimbus/Supervisor/Zookeeper/Worker日志中没有发现警告/异常时,一些事件没有被bolt接收(在clojure中,6-spout线程、50-bolt线程)。

  • 问题内容: 我今天犯了升级Eclipse的错误,现在无法启动新的Android项目。 我收到消息Proguard.cfg(找不到文件)。 我似乎在哪里找不到这东西?是否有可能摆脱它我在这个项目中不需要混淆… 谢谢 问题答案: 如果您确实不需要Proguard来混淆发行版,则可以从项目根文件夹的default.properties文件中删除以下行: proguard.config = proguar

  • 问题内容: 我想在我的网站上使用,但得到以下信息: 我试过打印。输出以下内容: 谁能帮助我找到或建议替代方案? 问题答案: 从文档中: 页面(如果有的话)的地址,该页面将用户代理引至当前页面。这是由用户代理设置的。并非所有的用户代理都将设置此功能,有些用户代理提供了将HTTP_REFERER修改为功能的功能。简而言之,它不能真正被信任。

  • 问题内容: 我目前正在一个小项目上使用RPi 3B(最新的Raspbian Jessie),该项目涉及播放简短的.mp4文件。由于Pygame似乎支持播放.mpg文件,因此我将视频转换为该格式。 但是,当我尝试导入movie模块时,出现了常见的python导入错误: 经过研究,我发现其他人也遇到了与此处所述相同的问题。我真的不知道是否有解决方案,或者我在RPi上有什么替代方案。 Pygame,py

  • 我厌倦了在我的代码中找到实际问题。我正在尝试在我的 android 项目中使用 FCM。我按照 firebase 控制台中给出的分步程序来激活我的项目中的服务,但我遇到了类似“错误”的问题:任务“:app:processDebugGoogleServices”执行失败。 文件 google-services.json 丢失。没有它,Google 服务插件将无法运行。搜索到的位置:C:\Users\