当前位置: 首页 > 知识库问答 >
问题:

Apache Flink使用Java-性能问题

羊毅庵
2023-03-14

我们有一个在 Java 上编写并在 AWS Kinesis Data Analytics 上运行的 flink 应用程序。应用程序从 AWS 托管服务 Kafka(kafka 主题 1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写入另一个 Kafka 主题(kafka 主题 2)。

并行度为 10,主题有 15 个分区。预计在 5 分钟内处理 ~20K 并发数据。但是,经过所有优化后,我们可以在25分钟内将其速度提高到约20K并发数据的速度。

如果有任何其他可以实现目标的性能优化,请告诉我。

Flink Async I/O 是否会成为进一步优化的选项?

StreamExecutionEnvironment streamenv =
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<ObjectNode> initialStreamData = streamenv
    .addSource(new FlinkKafkaConsumer<>(
        TOPIC_NAME, 
        new ObjectNodeJsonDeSerializerSchema(),
        kafkaConnectProperties);

initialStreamData.print();

DataStream<POJO> rawDataProcess = initialStreamData
    .rebalance()
    .flatMap(new ReProcessingDataProvider())
    .keyBy(value -> value.getPersonId());

rawDataProcess.print();

DataStream<POJO> cgmStream = rawDataProcess
    .keyBy(new ReProcessorKeySelector())
    .rebalance()
    .flatMap(new SgStreamTask());

cgmStream.print();

DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData
    .keyBy(new CGMKeySelector())
    .countWindow(2, 1)
    .apply(new ArtifactOverlapProvider()); //the same person_id key

cgmStreamData.print();

DataStream<POJO> streamWithSgRoc = null;

streamWithSgRoc = artfctOverlapStream
    .keyBy(new CGMKeySelector())
    .countWindow(7, 1)
    .apply(new SgRocProvider()); // the same person_id key 

streamWithSgRoc.print();

DataStream<POJO> cgmExcursionStream = null;

cgmExcursionStream = streamWithSgRoc
    .keyBy(new CGMKeySelector())
    .countWindow(Common.THREE, Common.ONE)
    .apply(new CGMExcursionProviderStream()); //the same person_id key

cgmExcursionStream.print();

cgmExcursionStream
    .addSink(new FlinkKafkaProducer<CGMDataCollector>(
        topicsProperties.getProperty(Common.CGM_EVENT_TOPIC),
        new CGMDataCollectorSchema(),
        kafkaConnectProperties));

共有1个答案

狄鸿禧
2023-03-14

在您分享的内容中,我看不到任何可以解释吞吐量低的原因。但是既然您询问了异步i/o,我想知道平面图是否正在进行一些外部i/o。如果是这样,那就可以解释了。如果是这样的话,那么使用异步i/o应该会有很大帮助,前提是外部服务可以处理增加的负载。

我还想知道为什么并行度是10,这10个插槽有哪些可用资源。是否有足够的内核来保持一切运转?对于15个分区,有5个插槽分别处理两个分区,另外5个插槽分别处理一个分区。5、8和15是更明显的并行选择。(当然,如果每个槽也访问平面图中的外部服务,这也需要考虑在内。)

看到代码后更新:

你可以做几件简单的事情来加快速度。

要做的一件事是为群集提供更多资源。您可以将并行性保持原样,但通过将任务管理器放在具有更多内核的计算机上,为每个插槽提供更多内核以供使用。

但在这样做之前,请先看一下优化管道。重新平衡和keyBy都相当昂贵,并且您使用它们的次数超过了必要的程度。有一个键By,然后立即进行重新平衡是没有意义的,有两个keyBy的一个紧接着另一个也没有意义。

Rebalance对数据流进行往返重新分区。这通常是在改变并行度或需要克服数据不对称时进行的。几乎从不使用重新平衡,除非在需要更改并行度时隐式使用。

KeyBy 执行基于密钥的重新分区。如果一个键By跟随另一个键,则第二个键将撤消第一个键所做的任何操作。

keyBy和rebalance都需要序列化和反序列化每个事件,并通过网络堆栈发送它们。这是你只有在绝对必要的情况下才想做的事情。

修复这些再平衡/密钥管理问题将减少集群上的工作负载。如果这还不足以实现所需的吞吐量,那么为每个插槽提供更多的内核(以便管道的各个阶段可以并行运行)应该可以做到这一点。

 类似资料:
  • 问题内容: 在处理多个千兆字节文件时,我注意到了一些奇怪的事情:似乎使用文件通道从文件读取到分配有allocateDirect的重复使用的ByteBuffer对象中,比从MappedByteBuffer中读取要慢得多,实际上,它甚至比读取字节中的记录还要慢。使用常规读取调用的数组! 我期望它(几乎)与从mapedbytebuffers读取的速度一样快,因为我的ByteBuffer是使用alloca

  • 在过去的几天里,当我使用Java6、Struts3框架和Tomcat7服务器对我的产品进行性能测试时,我一直面临着这个奇怪的问题

  • 问题内容: 我在Java2D方面表现有些古怪。我知道sun.java2d.opengl VM参数可以为2D启用3D加速,但是即使使用该参数也有一些奇怪的问题。 这是我运行的测试结果: 在JComponent上绘制具有32x32像素图块的25x18地图, 图像1 = .bmp格式,图像2 = .png格式 没有-Dsun.java2d.opengl = true 使用.BMP图像1的120 FPS使

  • 最近,我们将数据库从11g更新为19c。 在新数据库版本中测试应用程序时,我们遇到了一个特定视图的性能问题,该视图工作得非常好,但在19c中会导致性能问题。 在分析计划时,我们看到执行计划发生了巨大变化,这导致了19c中视图的性能非常差。 令人惊讶的是,其他观点的效果很好。 如果你能对这个问题有所了解,那就太好了。 谢谢你,JD

  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种