当前位置: 首页 > 知识库问答 >
问题:

本地flink执行的性能极低

长孙燕七
2023-03-14

我目前正在将我公司的一些算法移植到flink应用程序中,以便将来作为流运行。为了测试这些算法,我使用从CSV文件读取的现有数据,然后使用flink spector创建流。这些数据集通常包含大约10000个基准,而每个基准都包含一个时间戳和一个整数值。

我现在的问题是,flink应用程序需要非常长的时间(大约半个小时)来处理这些数据,这应该可以在几秒钟内轻松完成,我不知道为什么。

以下是我的代码的外观:

public class MyAlgorithmTest extends DataStreamTestBase {

    @Test
    public void testMyAlgorithm() {

        DataStreamSource<MyData> myDataStream = 
            createTestStream(getEventTimeInputBuilder("MyData.csv"));

        DataStream<MyData> avgDataStream = myDataStream
            .keyBy(value -> value.getUniqueId())
            .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30)))
            // aggregate data over windows of one minute
            .apply(new MyDataAggregator())
            .keyBy(value -> value.getUniqueId())
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(150)))
            // calculate the moving average over windows of five minutes
            .apply(new MovingAvgWindowFunction<>());
        }
    }

作业已在本地成功部署(很遗憾,无法在此处发布完整输出)。这是前几秒钟我输出的一部分:

11:38:22,138 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Job cd666d998a392d0907d5522babc80342 was successfully submitted to the JobManager akka://flink/deadLetters.
11:38:22,139 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job cd666d998a392d0907d5522babc80342 (Flink Streaming Job).
11:38:22,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (cd666d998a392d0907d5522babc80342) switched from state CREATED to RUNNING.
11:38:22,142 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) switched from CREATED to SCHEDULED.
11:38:22,146 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  Job execution switched to status RUNNING.
11:38:22,147 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  Source: Collection Source(1/1) switched to SCHEDULED 
11:38:22,155 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from CREATED to SCHEDULED.
11:38:22,156 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to SCHEDULED 
11:38:22,157 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from CREATED to SCHEDULED.
11:38:22,158 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to SCHEDULED 
11:38:22,164 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) switched from SCHEDULED to DEPLOYING.
11:38:22,165 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Collection Source (1/1) (attempt #0) to 
11:38:22,178 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  Source: Collection Source(1/1) switched to DEPLOYING 
11:38:22,187 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from SCHEDULED to DEPLOYING.
11:38:22,189 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to DEPLOYING 
11:38:22,189 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (attempt #0) to 
11:38:22,265 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from SCHEDULED to DEPLOYING.
11:38:22,268 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (attempt #0) to 
11:38:22,269 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to DEPLOYING 
11:38:22,312 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Source: Collection Source (1/1)
11:38:22,313 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) switched from CREATED to DEPLOYING.
11:38:22,314 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) [DEPLOYING]
11:38:22,321 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) [DEPLOYING].
11:38:22,328 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1)
11:38:22,331 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from CREATED to DEPLOYING.
11:38:22,331 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) [DEPLOYING]
11:38:22,332 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) [DEPLOYING].
11:38:22,333 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) [DEPLOYING].
11:38:22,337 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1)
11:38:22,336 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) [DEPLOYING].
11:38:22,346 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from CREATED to DEPLOYING.
11:38:22,349 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) [DEPLOYING]
11:38:22,349 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) [DEPLOYING].
11:38:22,351 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) [DEPLOYING].
11:38:22,364 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from DEPLOYING to RUNNING.
11:38:22,366 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) switched from DEPLOYING to RUNNING.
11:38:22,366 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from DEPLOYING to RUNNING.
11:38:22,371 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default state backend (Memory / JobManager)
11:38:22,386 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from DEPLOYING to RUNNING.
11:38:22,386 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to RUNNING 
11:38:22,377 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default state backend (Memory / JobManager)
11:38:22,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from DEPLOYING to RUNNING.
11:38:22,390 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to RUNNING 
11:38:22,395 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (712aa1b16f98f6a44ec52c60ed920a1c) switched from DEPLOYING to RUNNING.
11:38:22,395 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 11:38:22  Source: Collection Source(1/1) switched to RUNNING 
11:38:22,377 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default state backend (Memory / JobManager)
11:38:22,458 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend with stream factory.
11:38:22,469 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend with stream factory.

在这之后的半小时内,当我的CPU被充分利用时,没有输出。我正在记录对MyDataAggregator或MovingAvgWindowFunction的每次调用,以查看这些日志在半小时后需要多长时间:

12:09:06,106 INFO  com.myapplication      - MyDataAggregator
12:09:06,106 INFO  com.myapplication      - MyDataAggregator
12:09:06,106 INFO  com.myapplication      - MyDataAggregator
12:09:06,106 INFO  com.myapplication      - MovingAvgWindowFunction
12:09:06,107 INFO  com.myapplication      - MovingAvgWindowFunction
12:09:06,107 INFO  com.myapplication      - MyDataAggregator
...

然后工作就完成了:

12:09:07,739 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from RUNNING to FINISHED.
12:09:07,739 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0).
12:09:07,740 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) [FINISHED]
12:09:07,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (f168bd42f08204cc049673ab2985b4f0)
12:09:07,741 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (f168bd42f08204cc049673ab2985b4f0) switched from RUNNING to FINISHED.
12:09:07,741 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 12:09:07  TriggerWindow(SlidingEventTimeWindows(300000, 150000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@21ca5b67}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to FINISHED 
12:09:07,744 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from RUNNING to FINISHED.
12:09:07,744 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d).
12:09:07,745 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) [FINISHED]
12:09:07,756 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (debbe699a15106b9ef91ad4916a6ab5d)
12:09:07,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124)) (1/1) (debbe699a15106b9ef91ad4916a6ab5d) switched from RUNNING to FINISHED.
12:09:07,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (cd666d998a392d0907d5522babc80342) switched from state RUNNING to FINISHED.
12:09:07,759 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job cd666d998a392d0907d5522babc80342
12:09:07,759 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 12:09:07  TriggerWindow(SlidingEventTimeWindows(60000, 30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@44ebb3d8}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:1124))(1/1) switched to FINISHED 
12:09:07,759 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - 09/21/2017 12:09:07  Job execution switched to status FINISHED.
12:09:07,760 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - Shutting down
12:09:07,771 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Terminate JobClientActor.
12:09:07,771 INFO  org.apache.flink.runtime.client.JobClient                     - Job execution complete
12:09:07,772 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1402375298].

这真的很奇怪,因为半个小时内根本没有输出。有人知道Flink在那段时间里会做什么吗?

我知道本地执行环境没有优化,但是即使有10k值,窗口、键控和我简单的聚合平均计算都不会花那么长时间。瓶颈肯定是CPU一直被充分利用。我给应用程序2GB的RAM和i/o似乎不是问题,我的磁盘根本没有真正被利用。

编辑:我对这些数据集玩了一点。如果我将10k数据集减少到只有5k,执行时间从半小时下降到4分钟。这真的很奇怪,因为人们期望线性增长最大。

共有1个答案

齐高阳
2023-03-14

我会将分析器附加到Flink以了解是什么在30分钟内消耗了所有这些CPU周期。

Flink应该能够在几乎没有时间的情况下处理这些数据。

 类似资料:
  • Spark 是一个并行数据处理框架,这意味着任务应该在离数据尽可能近的地方执行(既 最少的数据传输)。 检查本地性 检查任务是否在本地运行的最好方式是在 Spark UI 上查看 stage 信息,注意下面截图中的 "Locality Level" 列显示任务运行在哪个地方。 调整本地性配置 你可以调整 Spark 在每个数据本地性阶段(data local --> process local -

  • 我有一个flink作业(scala),它基本上是从Kafka主题(1.0)读取数据,聚合数据(1分钟事件时间翻转窗口,使用折叠函数,我知道这是不推荐的,但比聚合函数更容易实现),并将结果写入两个不同的Kafka主题。 问题是——当我使用FS状态后端时,一切都运行顺利,检查点需要1-2秒,平均状态大小为200 mb——也就是说,直到状态大小增加(例如,在缩小差距的同时)。 我想我会尝试使用rocks

  • 我一直在批处理模式下测试使用TableApi和DataStream api的简单联接。然而,我得到了非常糟糕的结果,所以它一定是我做错了什么。用于联接的数据集约为 900gb 和 3gb。用于测试的环境是具有 10 * m5.xlarge 工作节点的 EMR。 使用的TableApi方法是在数据s3路径上创建一个表,并在目标s3路径上对创建的表执行插入语句。通过调整任务管理器内存、numberOf

  • 让我先说我对spark相对来说是新手,所以如果我说了一些没有意义的话,请纠正我。 总结一下这个问题,不管我做什么,在某些阶段,一个执行器做所有的计算,这使得集群执行比本地的单处理器执行慢。 完整故事:我编写了一个Spark1.6应用程序,它由一系列映射、过滤器、连接和一个简短的graphx部分组成。该应用程序只使用一个数据源-csv文件。出于开发的目的,我创建了一个由100,000行7MB组成的模

  • 我正在AWS EMR6.2上部署我的Flink应用程序 Flink版本:1.11.2 我用以下方法配置了该步骤: 如下所述:https://docs.aws.amazon.com/emr/latest/releaseguide/flink-jobs.html 我取出了应用程序中返回的类名,实际上它是。 应用程序本身作为本地应用程序在JobMaster实例上正常运行。

  • 极地图包括极地图、蜘蛛图和玫瑰图。 极地图的使用需要额外的引入 highcharts-more.js <script src="http://cdn.hcharts.cn/highcharts/highcharts-more.js"></script> 在设计极地图的 API 时,我们非常充分考虑了现有参数和设计模式的复用性,并最终巧妙的新增了一个参数即可将普通的图表转换成极地图,即 chart