当前位置: 首页 > 知识库问答 >
问题:

在一个闪烁作业中使用收集()和env.execute()

司徒隐水
2023-03-14

我试图在Flink中编写一个需要两个阶段的计算。

在第一阶段,我创建一个Graph并获取它的顶点id:

List<String> ids = graph.getVertexIds().collect();

在第二阶段,我想使用这些ID为每个顶点运行SingleSourceShortestPath。

for (String id: ids){
        System.out.println("Source Id: "+id);
        graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
    }

它在本地工作(在IntelliJ IDE和命令行中使用./bin/flink run...),但当我使用其WebUI在Flink上提交作业时,程序只是执行直到收集()方法并且不运行程序的剩余部分(用于语句和print())。

问题是什么?

这是我的代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
        Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
        Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
        Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
        Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);


        List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
        edgeList.add(e1);
        edgeList.add(e2);
        edgeList.add(e3);
        edgeList.add(e4);
        edgeList.add(e5);


        Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
                new MapFunction<String, String>() {
                    public String map(String value) {
                        return value;
                    }
                }, env);

        List<String> ids = graph.getVertexIds().collect();

        for (String id: ids){
            System.out.println("Source Id: "+id);
            graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
        }
    }
}

共有1个答案

龙正初
2023-03-14

基于此链接,Flink转换是懒惰的,这意味着在调用sink操作之前不会执行这些转换。

Flink中的sink操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出

诸如<代码>数据集之类的方法。collect(),数据集。Count()和数据集。print()是触发实际数据转换的接收器操作。

 类似资料:
  • 我有一个使用Apache Flink(Flink版本:1.8.1)使用Scala进行流式处理的工作。flow作业要求如下:Kafka->写给Hbase->用不同的主题再次发送给Kafka 在向Hbase写入过程中,需要从另一个表中检索数据。为确保数据不为空(NULL),作业必须(在一定时间内)重复检查数据是否为空。 编辑:我的意思是,有了我在内容中描述的问题,我想过必须在作业流中创建某种类型的作业

  • 我尝试用elasticsearch(版本为6.0.0)Sink构建一个flink流式单词计数演示。不幸的是出现了以下错误。这似乎是依赖关系。 我的elasticsearch集群是6.0.0,flink依赖项如下 有关详细信息,此错误在ElasticSearch6ApicallBridge.java方法中触发 谢谢你

  • 我创建小应用程序来显示我的问题。您可以在https://github.com/anton111111/exampleglideblink中看到它。 当我调用RecyerView适配器上的notifyItemChanged时,我只需要更改文本(在我的示例中,它使用id r.id.progress EditText)和图像,而不需要更改。但它会眨眼。 我有能力不眨眼地更改文本吗?

  • 我正在更新作为ios 6上传到AppStore的应用程序,以适应IOS 7 SDK,我有一个UIViewController的问题,我看到UIViewController在另一个上面闪烁了一秒钟,然后我可以看到第二个我应该,我添加了这张图片: ViewController中间的橙色来自上一个,它只发生在IOS 7上。推送到下一个视图控制器的代码是:`-(iAction)goButtonPresse

  • 我是一个新的flink和即将载入我们的第一个生产版本。我们有一个数据流。状态筛选器正在检查数据是否为新数据。

  • 我想运行流作业。 当我尝试使用和Flink Web界面在本地运行该作业时,没有问题。 但是,我当前正在尝试使用Flink on YARN(部署在Google Dataproc上)运行我的作业,并且当我尝试取消它时,取消状态将永远持续,并且TaskManager中仍有一个插槽被占用。 这是我得到的日志: