当前位置: 首页 > 知识库问答 >
问题:

保证消息处理的字计数

越运锋
2023-03-14

我正在尝试运行WordCount示例以保证消息处理。

有一个喷口

  1. WSpout-使用msgID发出随机句子

还有两个螺栓

>

  • SplitSentence-在单词中拆分句子并发出锚定

    字数计数-打印字数计数。

    我想用下面的代码实现的是,当一个句子中的所有单词都计算完毕时。必须确认与该句子对应的喷口。

    我向收藏家表示感谢。仅在最后一次bolt WordCount时确认(元组)。奇怪的是ack()的inspite在WordCount中被调用。execute(),对应的WSpout。未调用ack()。默认超时后,它总是失败。

    我真的不明白代码有什么问题。请帮我理解这个问题。谢谢你的帮助。

    下面是完整的代码。

    public class TestTopology {
    
        public static class WSpout implements IRichSpout {
            SpoutOutputCollector _collector;
        Integer msgID = 0;
        @Override
        public void nextTuple() {
            Random _rand = new Random();
            String[] sentences = new String[] { "There two things benefit",
                    " from Storms reliability capabilities",
                    "Specifying a link in the",
                    " tuple tree is " + "called anchoring",
                    " Anchoring is done at ",
                    "the same time you emit a " + "new tuple" };
    
            String message = sentences[_rand.nextInt(sentences.length)];
            _collector.emit(new Values(message), msgID);
            System.out.println(msgID + " " + message);
    
            msgID++;
        }
        @Override
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            System.out.println("open");
            _collector = collector;
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("LINE"));
        }
        @Override
        public void ack(Object msgID) {
            System.out.println("ack ------------------- " + msgID);
    
        }
        @Override
        public void fail(Object msgID) {
            System.out.println("fail ----------------- " + msgID);
    
        }
        @Override
        public void activate() {
            // TODO Auto-generated method stub
        }
        @Override
        public void close() {
    
        }
        @Override
        public void deactivate() {
            // TODO Auto-generated method stub
        }
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    }
    
    public static class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        public void prepare(Map conf, TopologyContext context,
                OutputCollector collector) {
            _collector = collector;
        }
    
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                System.out.println(word);
                _collector.emit(tuple, new Values(word));
            }
            //_collector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
    
    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println("WordCount MSGID : " + tuple.getMessageId());
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            System.out.println(word + " ===> " + count);
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new WSpout(), 2);
        builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
                "spout");
        builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split",
                new Fields("word"));
        Config conf = new Config();
        conf.setDebug(true);
    
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf,
                    builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
    }
    
  • 共有1个答案

    黄伟
    2023-03-14

    WordCount扩展了BaseBasicBolt,确保元组在该螺栓中自动确认,如您在注释中所述。但是,SplitSession扩展了BaseRichBolt,它要求您手动确认元组。你没有应答,所以元组暂停。

     类似资料:
    • Storm 通过 Trident 对保证消息处理提供了不同的 level ,包括 best effort(尽力而为),at least once (至少一次)和exactly once(至少一次). 这张页面描述如何保证至少处理一次. What does it mean for a message to be "fully processed"?(一条信息被完全处理是什么意思) 一个 tuple

    • 现在我正在学习Storm的保证消息处理,对这一部分的一些概念感到困惑。 为了保证喷口发出的信息得到充分处理,Storm使用acker来实现这一点。每次喷口发出一个元组时,acker将分配初始化为0的“ack val”来存储元组树的状态。每次该元组的下游螺栓发出新元组或确认一个“旧”元组时,元组ID将与“ack val”异或。acker只需要检查“ack val”是否为0,就可以知道元组已被完全处理

    • 在下面的示例中,我有两个正在处理来自kafka的消息的服务实例,但我希望确保只在之后处理。 显然,通过将一个实例配置为仅从特定分区消费,可以很容易地解决这种情况,该分区将存储带有公共标识符的消息: 现在顺序得到了保证,将永远不会在之前处理。 但是,我在想这个问题是否可以用另一种方式来解决,直接在代码中而不是依赖基础设施?这看起来可能是微服务架构中的一个标准问题,但我不确定哪种方法是解决它的首选方法

    • 通常,我希望将消息发送到另一个路由来处理它,但我不希望为后续步骤修改该消息。做这件事最好的方法是什么? 我发现的另一个选择是使用异步sedaendpoint,它将原始消息返回给生产者并处理副本,但这会引入异步行为,而异步行为可能并不总是可取的。 看来一定有更好的办法?

    • 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费和幂等性问题。不能少,就是说这数据别搞丢了。那这个问题你必须得考虑一下。 如果说你这个是用 MQ 来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢。 面试题剖

    • 在FLTK中是通过Fl_Widegt::handle(),虚拟函数来处理系统的消息。我们可以查看Fltk的源代码来分析系统是怎样处理一些系统消息的,如按钮的消息处理 /******************************************************* Fl_Button中处理消息的代码,省略了具体的处理代码 *******************************