当前位置: 首页 > 知识库问答 >
问题:

Flink:流拓扑中未定义运算符。无法执行

巫培
2023-03-14

我正试图设置一个非常基本的闪回工作。当我尝试运行时,得到以下错误:

Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)

错误由以下代码引起:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")

streamExecutionEnvironment.execute("Test Job")

当我向流的末尾添加print()调用时,错误消失了:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()

streamExecutionEnvironment.execute("Test Job")

我不明白为什么print()可以解决这个问题。在引入接收器之前,流拓扑不会处理其任何操作符吗?print()?任何帮助都将不胜感激。谢谢


共有1个答案

蓟辰沛
2023-03-14

在程序设计语言理论中,延迟求值或按需调用是一种求值策略,它将表达式的求值延迟到需要它的值时,同时避免重复求值。懒惰评价的对立面是急切评价,有时也被称为严格评价。懒惰评估的好处包括:

  • 将控制流(结构)定义为抽象而不是原语的能力
  • 定义潜在无限数据结构的能力。这允许更直接地实现某些算法
  • 通过避免不必要的计算和在计算复合表达式时避免错误条件,性能得以提高

懒惰的计算会减少内存占用,因为值是在需要时创建的。然而,由于操作的顺序变得不确定,懒惰评估很难与异常处理和输入/输出等命令式功能相结合。

通常,Flink将操作分为两类:转换操作和接收器操作。正如您所猜测的,Flink转换是惰性的,这意味着在调用接收器操作之前不会执行它们。

Flink程序是在分布式集合上实现转换的常规程序(例如,筛选、映射、更新状态、加入、分组、定义窗口、聚合)。集合最初是从源创建的(例如,通过读取文件、Kafka主题或本地内存中的集合)。结果通过接收器返回,例如,接收器可以将数据写入(分布式)文件或标准输出(例如,命令行终端)。

 类似资料:
  • 我正在浏览Apache Flink的基本WordCount示例。这是代码: 当我尝试在群集中部署此作业时,请使用: 我得到这个例外: 我不明白为什么,因为我是Flink的新手。请帮助我理解这个问题。谢谢你。 当我尝试直接从IDE运行此代码而不将JAR部署到集群时,它完全可以正常工作。

  • 自定义拓扑 Mininet 提供了 Python API,可以用来方便的自定义拓扑结构。 在 mininet/custom 目录下给出了几个例子。例如在 topo-2sw-2host.py 文件中定义了一个 mytopo,则可以通过 --topo 选项来指定使用这一拓扑,命令为 sudo mn --custom ~/mininet/custom/topo-2sw-2host.py --topo m

  • 我正在使用相同版本的petrel 0.9.3和apache storm。当我尝试运行拓扑时,会出现以下错误:

  • 问题内容: 我只是试图制作一个简单的类,让我弄清楚文件的长度: 我遇到了一个问题 我得到错误: 未为参数类型定义运算符!= int,null 有什么想法为什么会阻止这种情况吗? 问题答案: Java中的原始类型不能为。如果要检查0,请执行。

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能