当前位置: 首页 > 知识库问答 >
问题:

Apache Flink Wordcount:流拓扑中未定义运算符

齐迪
2023-03-14

我正在浏览Apache Flink的基本WordCount示例。这是代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // get input data
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new LineSplitter())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // execute and print result
        counts.print();
    }
}

final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

当我尝试在群集中部署此作业时,请使用:

./bin/flink run ../quickstart-0.1.jar

我得到这个例外:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2106)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2083)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2069)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1915)
    at org.myorg.quickstart.StreamingJob.main(StreamingJob.java:62)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 8 more

我不明白为什么,因为我是Flink的新手。请帮助我理解这个问题。谢谢你。

当我尝试直接从IDE运行此代码而不将JAR部署到集群时,它完全可以正常工作。

共有1个答案

史弘致
2023-03-14

这里有些不对劲。日志与代码不匹配,并且代码不是流式作业的代码——这是一个批处理作业。这个例子来自哪里?

不过,代码看起来确实应该可以工作——您确定它失败了吗?输出将写入集群日志目录中的文件。

 类似资料:
  • 我正试图设置一个非常基本的闪回工作。当我尝试运行时,得到以下错误: 错误由以下代码引起: 当我向流的末尾添加调用时,错误消失了: 我不明白为什么可以解决这个问题。在引入接收器之前,流拓扑不会处理其任何操作符吗?

  • 自定义拓扑 Mininet 提供了 Python API,可以用来方便的自定义拓扑结构。 在 mininet/custom 目录下给出了几个例子。例如在 topo-2sw-2host.py 文件中定义了一个 mytopo,则可以通过 --topo 选项来指定使用这一拓扑,命令为 sudo mn --custom ~/mininet/custom/topo-2sw-2host.py --topo m

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能

  • 问题内容: 我只是试图制作一个简单的类,让我弄清楚文件的长度: 我遇到了一个问题 我得到错误: 未为参数类型定义运算符!= int,null 有什么想法为什么会阻止这种情况吗? 问题答案: Java中的原始类型不能为。如果要检查0,请执行。

  • 我终于觉得我有了一个在redis数据库上写的toopology。我有一个插销要打印,还有一个插销要插入Redis。但当我尝试启动拓扑时,会出现以下错误: