当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink和Kinesis分析:java。lang.IllegalArgumentException:要分配的内存部分不应为0

陈阳舒
2023-03-14

背景:我一直试图在部署在运动分析运行时的同一个flink应用程序中设置BATCH STREAMING。流部分工作正常,但我有麻烦添加支持BATCH。

Flink:处理数据早于应用程序水印的密钥流

Apache Flink:数据流API的批处理模式失败,但“非法状态异常:排序输入不允许检查点”除外。'

逻辑是这样的:

The logic is something like this :

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

这样做,我得到了以下例外:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_90bea66de1c231edf33913ecd54406c1_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 10 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:306)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:426)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
    at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:302)
    ... 17 more

似乎运动分析不允许客户端定义flink-conf.yaml文件来定义taskmanager.memory.managed.consumer权重。有没有办法绕过这个?

共有1个答案

景震博
2023-03-14

我不清楚这个异常的根本原因是什么,也不清楚如何在KDA上进行批次处理作业。

你可以试试这个(但我不确定KDA是否允许):

java prettyprint-override">Configuration conf = new Configuration();
conf.setString("taskmanager.memory.managed.consumer-weights", "put-the-value-here");

StreamExecutionEnvironment env =
  StreamExecutionEnvironment.getExecutionEnvironment(conf);
 类似资料:
  • 我有一个一直在思考的问题。以这个特殊的类为例 假设我有一个B类,它拥有一个使用listOne读取详细信息的方法。要查看数组列表,我需要首先获取列表的大小,以便我的代码知道数组列表何时结束。有两种方法可以做到这一点,一种是 或者我也可以用 在内存和效率方面,哪种方法更好?此外,假设我正在递归地读取一个非常大的数组。为了简单起见,让我们假设递归读取此数组将导致堆栈溢出异常。在这种情况下,第一个方法在理

  • 我有一个非常消耗内存的程序,想使用-Xmx2048m为java jvm分配更多的最大内存。但在启动时,我得到一个“无法为对象堆保留足够的空间”错误。 当尝试多个值并发现我的最大值是时,它就开始了。但我喜欢分配2GB。在使用-Xmx1560m启动java进程后,我有6GB的物理内存,Taskmanager显示3400MB可用。 有人知道我为什么不能分配2GB吗?

  • 问题内容: 是局部变量,将其存储在堆或堆栈中的何处? 问题答案: 在堆上。每当您用来创建对象时,它都会在堆上分配。

  • 我在运行OSX 10.13.6的Mac上有PHP版本7.2.9。如果我加载phpinfo(),我在Safari中看到memory_limit=256M。然而,当我看php.ini(/usr/本地/php5/lib/php.ini)memory_limit=128M。这种差异的原因是什么——显然限制是在其他地方设定的,但是在哪里?我需要增加内存限制

  • 问题内容: 类B继承了类A。现在,当我们创建类型B的对象时,为B分配的内存是多少?是否包括A和B或任何其他内存分配过程? 问题答案: 当创建对象B时,假设调用了默认构造函数 然后,JVM分配具有更多或更少内容的对象: 在B中显式声明的每个字段都有足够的内存(每个字段通常大约4-8字节,但是类型和主机系统之间有很大差异) 对于A及其祖先继承的每个最终字段,都有足够的内存 足够的内存来包含对调度向量的

  • 问题内容: 这是一篇受此评论启发的帖子,内容涉及如何在CPython中为对象分配内存。最初,这是在创建列表并将其添加到for循环中_以_ 实现列表理解的上下文中。 所以这是我的问题: CPython中有多少个不同的分配器? 每个功能是什么? 什么时候被正式称为?(根据此评论中的内容,列表理解可能不会导致调用, python在启动时会为其分配多少内存? 是否有规则来控制哪些数据结构在此存储器上首先获