继续:Flink:处理数据早于应用程序水印的密钥流
基于这个建议,我一直在尝试在使用Datastream API的同一个Flink应用程序中添加对批处理的支持。
逻辑是这样的:
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<DetectionEvent>forBoundedOutOfOrderness(orderness)
.withTimestampAssigner(
(SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);
基于公共文档,我的理解是,我只需要将源更改为有界的。然而,上述处理在窗口化步骤后的事件触发器处继续失败,但有以下例外:
java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
输入文件包含多个键的历史事件。给定键的数据已排序,但整体数据未排序。我还在每个键的末尾添加了一个事件,时间戳=MAX_WATERMARK以指示键控流的结束。我也尝试了一个单键,但处理失败了,同样的例外。
注意:我没有启用检查点。我还尝试过显式禁用检查点,但没有效果。
env.getCheckpointConfig().disableCheckpointing();
编辑-1
添加更多细节:我尝试更改和使用FileSource读取文件,但仍然得到相同的异常。
environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
第一个流程步骤和密钥拆分工作正常。然而,在那之后它失败了。我尝试删除窗口并添加一个简单的过程步骤,但仍然失败。没有明确的接收器。最后一个进程函数只是更新数据库。
有什么我遗漏的吗?
只有启用检查点时才能引发该异常。也许您可以在flink-conf.yaml中配置一个检查点间隔?
主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢
我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087
我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)
我有jsp和html页面的应用程序 说明服务器遇到一个内部错误,使其无法满足此请求。 例外 NestedServletException:处理程序处理失败;嵌套异常是java.lang.NosuchMethoderror:javax.servlet.http.HttpServletResponse.getHeader(ljava/lang/string;)ljava/lang/string;rig
请帮助我更好地学习Spring 3 MVC的基础知识,我试图学习Spring JSR303:Bean验证,但根本无法解决以下问题,我已经花了一天多的时间来解决这个问题:( 我想要一个简单的验证在这里工作。jsp中的name、password和email字段不能留空,这是我们的目标。到目前为止,每次尝试将所有字段都为空提交hello.jsp时,都会遇到以下错误 HTTP状态500-处理程序处理失败;
问题内容: 我想知道与使用if语句先检查null相比,使用try / exception处理null的成本。 提供更多信息。因为在此应用程序中,有> 50%的机会获得空值。如果未输入任何数据,则通常为null。因此,尝试使用null进行计算是很平常的事。 话虽这么说,如果我先使用if语句在计算之前先检查null并且不首先尝试计算是否会提高性能,或者仅抛出异常并对其进行处理会更便宜? 感谢您的任何建