当前位置: 首页 > 知识库问答 >
问题:

使用Apache Flink执行计划任务

卓俊晖
2023-03-14

我在parallelism 5上有一份flink的工作(目前!!)。其中一个richFlatMap流在打开(配置参数)方法中打开一个文件。在flatMap操作中,没有任何打开操作,它只是读取文件来搜索某些内容。(有一个实用程序类,它的方法类似于utilityClass.searchText(“abc”))。以下是样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> {

    private MyUtilityFile myFile;

    @Override
    public void open(Configuration parameters) throws Exception {
        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

python脚本每天都会在特定时间更新此文件。因此,我还应该在flatMap流中打开新创建的文件(通过python脚本)。

我只是认为这可以通过schduledExecutorService只用一个线程池来完成。

我无法打开每个flatMap调用的文件,因为它太大了。

下面是我试图编写的样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private MyUtilityFile myFile;

    @Override
    public void run() {
       myFile.Open("fileLocation");     
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

此样板是否适用于Flink环境?如果不是,我如何以预定的方式打开文件?(没有“使用kafka更新文件发送事件并通过flink读取事件”等选项)

共有1个答案

白光耀
2023-03-14

也许您可以直接实现ProcessingTimeCallback接口,它支持计时器操作

public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback { 
    private MyUtilityFile myFile;

 
    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        myFile.Open("fileLocation");

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);
    }
}
 类似资料:
  • 我用SpringBoot创建了一个简单的演示应用程序,其中包括执行器。带有@Scheduled注释的任务显示在执行器中,但以编程方式启动的任务不会显示。有没有办法让他们也出现? 我已经注释了@Enable调度。 我的组件如下所示: 执行器的结果仅显示带注释的任务:

  • > 我需要在执行前一个任务后以不同的延迟一个接一个地运行计划的任务。示例。有一个任务列表和延迟列表。 现在我需要运行task1通过100ms,task2在task1之后通过9ms,task3在task2之后通过22ms等等。 我正在使用javafx。任务可以使用一些UI更新方法,例如更改节点位置。这迫使我使用平台。runLater()方法,因为如果不这样做,则会出现异常“Not on FX app

  • 我们在Spring web应用程序中使用预定任务来发送提醒、每日摘要等: 每个调度任务调用一个给定的服务方法(上面伪代码中的fooService.bar())。我想监控每次处决持续多长时间。随着负载、数据或复杂性的增加,其中一些方法可能需要更长的时间。我可以给每个服务方法添加日志记录语句(现在大约有10个,但将来可能会更多),或者使用方面给每个方法添加一些秒表行为。但是,对于spring中的所有预

  • 我有一个批处理(*. bat)文件,触发一个Python脚本,这个脚本需要大约25分钟来完成交互式(通过命令提示符手动)。这个批处理文件需要每天早上运行。 当我尝试在Windows任务调度器上将其设置为计划任务并在那里运行时,所用的时间几乎是交互时的两倍。即使我在xml中将优先级设置从默认的7设置为4(更高的优先级),也没有任何区别。更改优先级设置仅适用于I/O优先级,但不适用于内存优先级,内存优

  • 我正在从事一个Spring Webflux项目,在计划任务中发布和使用Flux时遇到了一个问题。 我配置的调度程序: 除非我在最后故意阻止,否则这项任务永远不会完成: 我最初没有费心直接引用发布/订阅计划程序,我尝试了所有看似合理但没有效果的选项。 我的日志事件发生了,但当来自调度程序的该任务的线程死亡时,通量也会被丢弃;即使在我指定发布和订阅行为后,它们应该在自己的线程池中? 我想使这个行动完全

  • 我在使用spring Boot开发的Web应用程序中有一个预定的任务。我在tomcat集群上运行它,所以在X小时,计划的任务从每个节点开始。 我读到了:https://github.com/lukas-krecan/shedlock,所以我跟着指南走了,但它不起作用..以下是我所做的: 我在POM中包含了这些依赖项: 然后我把这个添加到我的方法中: 如何避免使用spring Boot同时执行多次任