当前位置: 首页 > 知识库问答 >
问题:

为什么flink会停止我的流应用?

松铭
2023-03-14

我的代码使用readTextFile读取日志文件,当我在Flink(< code >/opt/Flink-1 . 0 . 3/bin/Flink run-m yarn-cluster-yn 2/home/Flink/Flink-JSON-0.1 . jar )中运行jar时,它成功处理了里面的行,并停止了我的应用程序,而不是等待新的行。我做这件事需要一些参数吗?

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")

先谢谢你了

共有1个答案

程俊健
2023-03-14

您正在寻找

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

对于 WatchType,您有以下选项

    < li >仅_新_文件, < li>REPROCESS_WITH_APPENDED, < li > PROCESS _ ONLY _ APPENDED

来自…的溪流

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)

将在读取所有文件后完成。我认为,它主要是在开发过程中进行局部测试。

 类似资料:
  • 问题内容: 基本上,一切似乎都可以正常运行并启动,但是由于某些原因,我无法调用任何命令。我已经很轻松地环顾了一个小时,然后看了一些示例/观看视频,但我终生无法找出问题所在。代码如下: 我在中拥有的调试输出实际上可以正常工作并做出响应,并且整个bot都可以运行,没有任何异常,但是它只是不会调用命令。 问题答案: 覆盖提供的默认值将禁止运行任何其他命令。要解决此问题,请在的末尾添加一行。例如: 默认值

  • 我在使用Spring 3.1.1,在WAS8.5环境中,使用Spring的@计划功能每天每8小时运行一次任务。它开始并运行了一段时间,然后无缘无故地停止。我的日志中没有任何内容表明失败。知道是什么导致了这种情况吗?现在已经发生过几次了。这不是由于服务器重启等原因。 我的应用程序上下文。xml 我的调度程序。属性 我知道我可以在一行上每8小时设置一次CRON parm,但是用户可以在他们想要的方面获

  • 我想根据id加入Customer和Address对象。这些是我对kafka stream for Customer主题的输入 和以下fro地址 我使用了间隔连接以及使用TumblingEventTimeWindows和滑动窗口的JoinFunction,但它没有连接客户和地址流。我不明白我在代码中遗漏了什么。

  • 我有一个在dev环境下运行的grails应用程序,我毫无问题地完成了这场战争 在服务器中,我使用“sudo service tomcat7 stop”命令停止服务,并将war文件放入服务器文件夹“/var/lib/tomcat7/webapps”,然后使用命令“sudo service tomcat7 start”再次启动服务器。为了检查服务器的状态,我运行“sudo service tomcat

  • 我在Ubuntu 14.04上安装了redis,而且我似乎几乎每周都有完成RDB快照的问题。Redis版本是3.0.4 64位。 3838:M 24 Feb 09:46:28.826*后台保存成功终止 3838:M 24 Feb 09:47:29.088*在60秒内更改100000次。拯救 3838:M 24 Feb 09:47:29.230*后台保存由pid 17281启动 17281:信号处理

  • 我正在尝试为Flink流媒体作业创建JUnit测试,该作业将数据写入kafka主题,并分别使用和从同一kafka主题读取数据。我正在通过生产中的测试数据: 以及检查来自消费者的数据是否与以下数据相同: 使用。 通过打印流,我能够看到来自消费者的数据。但无法获得Junit测试结果,因为即使消息完成,使用者仍将继续运行。所以它并没有来测试这个部件。 在或中是否有任何方法停止进程或运行特定时间?