当前位置: 首页 > 知识库问答 >
问题:

Flink:一个程序中的数据集和数据流API。有可能吗?

仇征
2023-03-14

我想首先使用dataset API操作静态数据,然后使用DataStream API运行流作业。如果我在IDE上编写代码,它工作得很好。但是当我尝试在本地flink jobmanager上运行时(全部并行性1),流式代码从未执行过!

例如,以下代码不起作用:

val parallelism = 1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)

val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)

val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head

val theStream = env.fromElements(1).iterate( iteretion => {
  val result = iteretion.map(x => x + myVal)
  (result, result)
})
theStream.print()
env.execute("static and streaming together")

执行计划:计划似乎是一个循环。

共有1个答案

闻人嘉悦
2023-03-14

如果Flink作业由多个子作业组成,例如由计数收集打印触发,则无法通过web界面提交作业。web界面只支持单个Flink作业。

 类似资料:
  • 我一直在关注Flink 1.14中针对有界数据的不同全局数据排序选项。我发现Stackoverflow和其他网站上关于这个的很多问题都是好几年前的问题了,关于不推荐使用的API或者没有完全回答这个问题。由于Flink正在快速发展,我想问一下最新稳定的Flink (1.14)中的可用选项。 以下是我如何理解当前的情况(这可能是错误的)。我的问题也附上。Flink 有两个 API —— 和 , 它们可

  • 我正在使用Flink表API,使用Java将数据集转换为数据流....以下是我的代码: ExpressionException:JavaStreamingTranslator的根无效:Root(ArraySeq((related_value,Double),(ref_id,String)))。您尝试将基于数据集的表转换为数据流吗?我想知道我们如何使用Flink表API将DataSet转换为Data

  • 我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端

  • 我们希望将迭代与Async IO运算符结合使用,为同一事件执行顺序API调用。但是,在回答我提出的另一个问题时,有人提到使用Datastreams唱迭代是个坏主意。 管理使用大量内存的状态-从存储中查询 有人能进一步解释一下吗?

  • Flink是否支持数据集中的侧输出功能(批处理Api)?如果没有,从文件加载时如何处理有效和无效记录?

  • 我有一个(相当复杂的)数据类型: 现在我发现自己需要另一个数据类型…有两个构造函数。一个与的相同;另一个只存储一个。我有什么选择? 虽然这会起作用,但它也允许类似这样的东西,这是没有意义的。