当前位置: 首页 > 知识库问答 >
问题:

暴雨流喷流处理中nextTuple方法的几个问题

姚烨
2023-03-14

我正在Storm之上开发一些数据分析算法,对Storm的内部设计有一些疑问。我想模拟一个传感器数据在Storm中的产生和处理,因此我使用Spout通过在Spout的nextTuple方法中设置Hibernate方法,将传感器数据以恒定的时间间隔推送到后续的螺栓中。但从实验结果来看,喷口并没有按规定的速率推送数据。在实验中,系统中没有瓶颈螺栓。

共有1个答案

岳昊空
2023-03-14

我有一个类似的用例,我实现它的方法是使用tick_tuple

Config tickConfig = new Config();                                       
tickConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15);
...
...
builder.setBolt("storage_bolt", new S3Bolt(), 4).fieldsGrouping("shuffle_bolt", new Fields("hash")).addConfigurations(tickConfig);

然后在我的storage_bolt中(注意它是用python编写的,但您会有一个想法),我检查消息是否为tick_tuple,然后执行我的代码:

def process(self, tup): 
    if tup.stream == '__tick':                                              
        # Your logic that need to be executed every 15 seconds,
        # or what ever you specified in tickConfig.
        # NOTE: the maximum time is 600 s.                                            
        storm.ack(tup)                                                      
        return  
 类似资料:
  • 问题内容: 任何人都可以澄清一下下面的过程是否是正确的处理流程流的方法,而没有任何流缓冲区已满和阻塞问题 我正在从Java程序中调用外部程序,正在使用ProcessBuilder来构建流程,执行之后 我正在使用一种方法来处理流程 在我的方法中,我试图处理流程流 readStream方法用于读取我的流文本。 问题答案: 不,那不是正确的方法。 首先,在某些系统上,您的代码将永远停留在调用中,因为该过

  • null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

  • 我试图理解方法是如何精确地处理并行流的,我不理解为什么下面的代码不返回这些字符串的串联。 代码如下: 该代码仅适用于顺序流,但对于并行流,它不会返回串联。每次输出都不同。有人能解释一下那里发生了什么事吗?

  • 我理解是使用实现背压的一种简单方法。我想明白,现在背压已经实现了,我们还需要来节流喷口吗? 谢谢!

  • 问题内容: 用Java产生和使用外部进程的流(IO)的正确方法是什么?据我所知,由于可能的缓冲区大小有限,因此应在与生成进程输入并行的线程中使用java结束输入流(进程输出)。 但是我不确定我是否最终需要与这些使用者线程进行同步,或者仅等待进程退出以使用方法就足够了,以确保所有进程输出实际上都被消耗了?IE是否有可能,即使进程退出(关闭其输出流),流的Java端仍存在未读数据?实际如何知道该过程何

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法