当前位置: 首页 > 知识库问答 >
问题:

如何从程序中停止flink流作业

缪修德
2023-03-14

我正在尝试为Flink流媒体作业创建JUnit测试,该作业将数据写入kafka主题,并分别使用FlinkKafkaProducer09FlinkKafkaConsumer09从同一kafka主题读取数据。我正在通过生产中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

以及检查来自消费者的数据是否与以下数据相同:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用TestListResultSink

通过打印流,我能够看到来自消费者的数据。但无法获得Junit测试结果,因为即使消息完成,使用者仍将继续运行。所以它并没有来测试这个部件。

FlinkFlinkKafkaConsumer09中是否有任何方法停止进程或运行特定时间?

共有3个答案

宋飞文
2023-03-14

在您的测试中,您可以在单独的线程中启动作业执行,等待一段时间以允许它进行数据处理,取消线程(它将中断作业)并进行关联。

CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
    try {
        environment.execute(jobName);
    } catch (Exception e) {
        e.printStackTrace();
    }
});
try {
    handle.get(seconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
}

// Make assertions here
冯枫
2023-03-14

您不能在反序列化程序中使用isEndOfStream覆盖来停止从Kafka提取吗?如果我读得正确,flink/Kafka09Fetcher的run方法中有以下代码,这会中断事件循环

    if (deserializer.isEndOfStream(value)) {
                        // end of stream signaled
                        running = false;
                        break;
                    }

我的想法是使用Till Rohrmann关于控制消息的想法,并结合这个isEndOfStream方法来告诉Kafka消费者停止阅读。

有什么理由不起作用吗?或者是一些我忽略的角落案例?

https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L146

夏侯腾
2023-03-14

潜在的问题是,流媒体节目通常不是有限的,而且运行无限期。

最好的方法,至少目前是这样,是在流中插入一个特殊的控制消息,让源正确终止(只需通过离开读取循环停止读取更多数据)。这样,Flink将告诉所有下游操作员,他们可以在消耗完所有数据后停止。

或者,您可以在源中抛出一个特殊的异常(例如,在一段时间后),这样您就可以通过检查错误原因来区分“正确”终止和失败情况。在源代码中抛出异常将使程序失败。

 类似资料:
  • 问题内容: 我有以下代码: 在发生某些事件之后,我应该停止在的方法中声明的操作,该方法实现。 我怎样才能做到这一点?我无法关闭执行器,只能撤消我的定期任务。我可以用吗?如果可以的话,请告诉我它将如何工作。 问题答案: 使用。该是你的任务的处理。您需要取消此任务,它将不再执行。 实际上,是签名,并将其与参数一起使用将导致当前正在运行的执行线程被调用中断。如果线程在阻塞的可中断调用(例如)中等待,则会

  • 我已经使用Spring云数据流来控制一些批处理作业。在SCDF中,我定义了一些任务后,它们作为运行状态的作业启动。当我试图停止一项特定的工作时,它并没有立即停止。我发现该工作仍在运行,直到它完成了当前的步骤。 例如,我的作业'abc'有2个步骤A和B。在SCDF中,当步骤A正在执行,而作业'abc'仍在运行时,我停止作业'abc',直到步骤A完成,并且它没有实现步骤B。 那么,有什么方法可以从Sp

  • 我有一个简单的控制台应用程序,有时需要执行图形操作,对于那些我使用JavaFx框架(有一些功能,我需要像css样式的文本)我只是生成一些形状和文本到一个隐藏的场景,然后保存在文件中仅此而已, 我知道要使用JavaFx,我必须将图形操作传递给JavaFx线程,但是当一切都完成后,我必须关闭应用程序(几个小时后),这个JavaFx线程仍然保持打开...我真的不想强行退出System.exit(),因为

  • 有一个很奇怪的问题...有一次我打开我的项目,并注意到我没有机会从工作室推出app...黑色三角形唱罩禁用 但是,如果我构建apk文件并使用adb命令安装它,它就可以工作。 我已经尝试删除和下载新的Android Studio,但没有任何改变... 我的插件 我的配置

  • 问题内容: 我有一个侦听端口的简单TCP服务器。 我先将其启动,然后在Mac上使用Ctrl + Z将其关闭。当我尝试再次运行它时,出现以下错误消息: 我是否以错误的方式关闭程序?如何防止这种情况发生? 问题答案: 要结束程序,您应该使用+ 。如果这样做,它会发送,从而允许程序正常结束,并与正在侦听的任何端口解除绑定。 另请参阅:https : //superuser.com/a/262948/48