当前位置: 首页 > 知识库问答 >
问题:

如何使用emit()和sync()从Storm spout输出元组流?

伯君浩
2023-03-14

(xpost github问题)

这是我目前为止的尝试。我想我有一些配置错误,因为我知道我的wsemitter正在发射future事件,但我的Storm UI显示没有喷出发射。

我想也许我不应该在spout函数中绑定监听器?

该函数是否被多次调用?(看起来像是...参见https://github.com/rallysoftware/node-storm/blob/master/lib/spout.js#L4)

var storm = require('node-storm');
var wsEmitter = require('./wsEmitter.js')();
wsEmitter.init();  // subscribe to websocket

var futuresSpout = storm.spout(function(sync) {
  var self = this;
  console.log('subscribing to ws');
  wsEmitter.on('future', function(data){       // websocket data arrived
    self.emit([data]);
    sync();
  });
})
.declareOutputFields(["a"]);

共有1个答案

周浩博
2023-03-14

结果我有两个问题。首先,我的拓扑没有执行,因为我的一个bolt(未显示)无法设置.declareoutputfields()

其次,我需要延迟喷口的发射,直到主管使用NextTick()请求一个发射。我通过缓冲所有传入的消息来做到这一点,直到主管调用spout:

module.exports = (function(){
  var storm = require('node-storm');

  var wsEmitter = require('./wsEmitter.js')();
  wsEmitter.init();

  var queue = [];
  var queueEmpty = true;

  wsEmitter.on('thing', function(data){
    var trade = JSON.parse(data);
    trade.timeReported = new Date().valueOf();
    queue.push(trade);
    queueEmpty = false;
  });

  return storm.spout(function(sync) {
    var self = this;
    setTimeout(function(){
      if(!queueEmpty){
        self.emit([queue.shift()]);
        queueEmpty =
        ( queue.length === 0
        ? true
        : false )
      }
      sync();
    }, 100);

  })
  .declareOutputFields(['trade'])
})()
 类似资料:
  • 我有一个拓扑并在本地模式下运行它,比如-->-->。而且,我定义了一个对象,它是从B到C发出的值。我有两个问题: 我在中设置并在创建时设置新的。当为时,它将发送到。我认为Storm会克隆并在中生成一个新的,然而,当我试图更新的内容时,它仍然在中发生变化。为什么?这两个bolt可能位于不同的服务器中,并且不可能在多个bolt之间共享内存中的相同变量。

  • 因此,我试图分析我的元组集,看看对于该集中的x,是否有两对分别由2个元素和3个元素组成的元素。 所以就像每个第一个字符一样,必须有3个相同的字符和2个相同的字符。。。 但它们必须是连接的,例如如果有3A和2C,则顺序只能是AAAAC或CCAAA,而不能是CACAA或ACCAA等 那么比如说,, 下面是我对代码的尝试 然而,我的代码只有在它是一个列表时才起作用。。。当我放入一组元组时,它不起作用。。

  • 如何使用和方法将对象从一个控制器发送到另一个控制器? 它的工作方式并不像我想的那样。和如何工作?

  • 问题内容: 如何使用和方法将对象从一个控制器发送到另一个控制器? 它不按我认为的方式工作。如何做和工作? 问题答案: 首先,父子范围关系确实很重要。你有两种可能性发出某些事件: -将事件向下分发到所有子范围, -通过范围层次结构向上调度事件。 我对你的控制器(作用域)关系一无所知,但是有几种选择: 如果scope of 是作用域的父级,则你的代码应通过替换为来工作: 如果你的范围之间没有父子关系,

  • 问题内容: 我正在阅读有关PyQt5的一些文档,以提出一种简单的信号插槽机制。由于设计方面的考虑,我停了下来。 考虑以下代码: 为了跟踪对滑块所做的更改,我仅打印并记录所做的更改。我对代码不满意的是,我需要调用三次插槽以将相同的信息发送到3个不同的插槽。 是否可以创建自己的将整数发送到单个插槽的函数。插槽功能又会发出需要进行的更改吗? 也许我不完全了解它的目的,因为在PyQt Signal-Slo

  • 问题内容: 我该如何转: 到这个: 或者更确切地说,是否有任何有用的资源可供阅读,以便能够生成该JSON输出?所以我知道我需要将我的数据“转换”为正确的数据结构。之后,就像 谢谢 问题答案: 使用json库。 然后使用以下方法转换数据: 并致电: