当前位置: 首页 > 知识库问答 >
问题:

Flink-多源集成测试

康锦
2023-03-14

我有一个Flink作业,我正在使用这里描述的方法进行集成测试:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing

作业从两个源获取输入,这两个源组合在一个coflatmapFunction中。在测试环境中,我目前使用两个简单的SourceFunction来发出值,但是这不提供对事件发出顺序的任何控制。这是正确测试作业功能所必需的。

如何修改测试以确保一个源函数在第二个源函数之前发出其所有数据?

我在Flink中看到了复杂拓扑(多输入)集成测试中建议的方法,这对于单元测试很好,但我正在寻找一种解决方案,允许我集成测试整个工作。

共有1个答案

邵骁
2023-03-14

我建议将控制代码添加到两个SourceFunctions中,并使用MiniClusterWithClientResource。它可以如下所示:

public class JobITCase {

    private static final int NUM_TMS = 2;
    private static final int NUM_SLOTS = 2;
    private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;

    @ClassRule
    public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(NUM_SLOTS)
                .setNumberTaskManagers(NUM_TMS)
                .build());

    @Test
    public void testJob() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);

        final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
        final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");

        final DataStreamSource<Integer> input1 = env.addSource(source1);
        final DataStreamSource<Integer> input2 = env.addSource(source2);

        input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
            @Override
            public Integer map1(Integer integer) {
                System.out.println("Input 1: " + integer);
                return integer;
            }

            @Override
            public Integer map2(Integer integer) {
                System.out.println("Input 2: " + integer);
                return integer;
            }
        }).print();

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();

        MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();

        final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());

        final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source1, i);
            finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
        }

        FutureUtils.waitForAll(finishedFutures).join();

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source2, i);
        }

        jobResultFuture.join();
    }

    private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {

        private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
        private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();

        private final String name;

        private boolean running = true;

        private MyControllableSourceFunction(String name) {
            this.name = name;
        }

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            final int index = getRuntimeContext().getIndexOfThisSubtask();

            final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
            final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());

            startLatch.await();
            int counter = 0;

            while (running && counter < 10) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(counter++);
                }
            }

            finishedFuture.complete(null);
        }

        @Override
        public void cancel() {
            running = false;
        }

        private String getId(int index) {
            return name + '_' + index;
        }

        static void startExecution(MyControllableSourceFunction source, int index) {
            final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
            startLatch.countDown();
        }

        static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
            return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
        }
    }
}
 类似资料:
  • 例如,我想用和测试Kafka/Flink的集成。 该过程将是: 与Flink一起阅读Kafka主题 用Flink进行一些操作 和Flink一起写另一个Kafka主题 以字符串为例,从输入主题中读取字符串,转换为大写,写入新主题。 问题是如何测试流量? 当我说测试时,这是单元/集成测试。 谢谢!

  • 我正在尝试将Apache Kafka 2.11-0.10.0.0与Apache Flink 1.1.2集成。我正在使用scalashell来测试它,我得到了以下错误。 类别组织。阿帕奇。Flink。流式处理。api。检查点。未找到检查点通知程序 我已经添加了组织。阿帕奇。Flink。将jar流式传输到类路径,但这没有帮助。我一直导入到org。阿帕奇。Flink。流式处理。api。检查点。\u。这仍

  • 问题内容: 我正在尝试将Flink与Elasticsearch 2.1.1集成,我正在使用Maven依赖项 这是我从Kafka队列中读取事件的Java代码(工作正常),但是无论如何,如果我更改了任何相关设置,则事件不会在Elasticsearch中发布,也没有错误,在以下代码中到ElasticSearch的端口,主机名,集群名称或索引名称,然后立即看到错误,但当前它不显示任何错误,也没有在Elas

  • 主要内容:集成测试背后的原因,集成测试技术,集成测试方法,集成测试指南集成测试是单元测试后软件测试过程的第二个层次。在此测试中,软件的单元或单个组件在组中进行测试。集成测试级别的重点是在集成组件或单元之间交互时暴露缺陷。 单元测试使用模块进行测试,这些模块在集成测试中进行组合和测试。该软件使用许多软件模块开发,这些软件模块由不同的编码器或程序员编码。集成测试的目标是检查所有模块之间通信的正确性。 集成测试背后的原因 虽然软件应用程序的所有模块已经在单元测试中进行了测

  • 设计 集成测试包括 3 个模块:测试用例、测试环境以及测试引擎。 测试用例 用于定义待测试的 SQL 以及测试结果的断言数据。 每个用例定义一条 SQL,SQL 可定义多种数据库执行类型。 测试环境 用于搭建运行测试用例的数据库和 ShardingSphere-Proxy 环境。 环境又具体分为环境准备方式,数据库类型和场景。 环境准备方式分为 Native 和 Docker,未来还将增加 Emb

  • 英文原文:http://emberjs.com/guides/testing/integration/ 集成测试通常用来测试应用中得重要工作流。集成测试用来模拟用户交互和确认交互结果。 设置 为了对Ember应用进行集成测试,需要在测试框架中运行应用。首先需要将根元素(root element)设置为任意一个已知将存在的元素。如果根元素在测试运行时可见的话,这对测试驱动开发非常有用,带来的帮助非常