当前位置: 首页 > 知识库问答 >
问题:

Flink SQL中的控制流

单于正业
2023-03-14

通过stream API,我可以编写一个RichCoFlatMapFunction来接受一个控制流和一个数据流,控制流中包含了启动、停止或改变参数的计算元素,我知道我可以存储当前控制设置的状态,并在处理数据流时检查值。

但是用Flink SQL做类似的事情的方法是什么呢?我不能使用join,因为数据流和控制流不能连接在一起。

我们提出的解决方案是通过应用程序本身存储控件设置。其想法是:

>

  • 将控制流广播给map运算符,并将控制设置存储到其map()方法中的java单例对象,因为map运算符将以默认并行性运行,我们假设它将运行在该作业的所有JVM上,这样我们就确保每个JVM都将初始化并不断更新单例对象中的控制设置。

  • 共有1个答案

    葛志国
    2023-03-14

    我不认为那是个好主意。SQL并不是为这样的用例而设计的。相反,SQL查询是按照指定的方式优化和执行的。不打算更改查询的行为。除了design透视图之外,它也不能很好地执行,因为您需要为处理的每个记录执行分布式可查询状态的远程状态查找。这当然会增加延迟。

    对我来说,您的用例听起来更像是一个应用程序,而不是SQL查询。因此,DataStream API将是正确的选择。您可以做的是将SQL(或表API)查询嵌入到应用程序中,即使用SQL进行前后处理,并在中间设置一个带有控件/数据流模式的运算符。

     类似资料:
    • spring MVC中controllin异常流的良好实践是什么? 假设我有一个DAO类,它将对象保存到数据库中,但如果违反了某些规则,例如名称太长、年龄太低,则抛出异常, 现在,如果我想保存名称超过10的A,它应该抛出异常。 但是有一个dataManipulator对象 和控制器 我希望在不抛出异常的情况下保留控制器(我听说这是一个很好的做法)。 但我的问题是,在这种情况下,A\u Data\u

    • 我有一个问题是这个问题的变体:Flink:如何存储状态和在另一个流中使用? 我有两条流: val ipStream:DataStream[IP地址]= <代码>val routeStream:数据流[路由表]= 我想知道哪个包裹使用哪条路线。通常可以通过以下方式完成: 这里的问题是,我无法在这里真正为流设置密钥,因为这既需要完整的表,也需要ip地址(并且密钥必须独立计算)。 对于中的每个元素,我需

    • 这让我的头撞到了墙上!!!!希望有人能给我指出正确的方向。我确信我在做一些完全愚蠢的事情。我找了又找,但似乎找不出为什么这不管用!!(在IntelliJ IDE 15.x上使用JDK8.x和Scenebuilder)。我试图在GUI上显示数据,但希望access通过编程将这些数据从其他类/方法发送给它。。。。下面是一个简单的概念,我在着手更大的项目之前,正试图让它发挥作用: 我的简单GUI在FXM

    • 我无法在Eclipse中运行python控制台。我下载了Pydev,设置了一个解释器和一个PYTHONPATH并导入了一个项目,一个Django项目。我将项目设置为Django的一个,然后我尝试右键单击带有Django环境的Project-->Django-->Shell。 我在控制台中出现了以下错误: 如果我尝试打开一个Windows shell并执行通常的python manage.py sh

    • 对于任何项目来说,版本控制都是很重要的一个方面。Atom集成了一些基本的Git和Github功能。 检出(checkout)HEAD中的版本 cmd-alt-Z快捷键检出当前文件在HEAD中的版本。 这是一个快捷的方法,来撤销所有你保存的或者阶段性的修改,并且把你的文件还原到HEAD中(最后提交)的版本。这从本质上相当于使用命令行在path中执行git checkout HEAD -- <path

    • 本文向大家介绍Python中的流程控制详解,包括了Python中的流程控制详解的使用技巧和注意事项,需要的朋友参考一下 流程控制无非就是if else之类的控制语句,今天我们来看一下Python中的流程控制会有什么不太一样的地方。 while语句 python中的while语句和其他语言没有什么不一样,我使用while语句来编写一个斐波拉赫数列: if 语句 python中的 if 可以和 eli