我们有一个在 Java 上编写并在 AWS Kinesis Data Analytics 上运行的 flink 应用程序。应用程序从 AWS 托管服务 Kafka(kafka 主题 1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写入另一个 Kafka 主题(kafka 主题 2)。
并行度为 10,主题有 15 个分区。预计在 5 分钟内处理 ~20K 并发数据。但是,经过所有优化后,我们可以在25分钟内将其速度提高到约20K并发数据的速度。
如果有任何其他可以实现目标的性能优化,请告诉我。
Flink Async I/O 是否会成为进一步优化的选项?
StreamExecutionEnvironment streamenv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> initialStreamData = streamenv
.addSource(new FlinkKafkaConsumer<>(
TOPIC_NAME,
new ObjectNodeJsonDeSerializerSchema(),
kafkaConnectProperties);
initialStreamData.print();
DataStream<POJO> rawDataProcess = initialStreamData
.rebalance()
.flatMap(new ReProcessingDataProvider())
.keyBy(value -> value.getPersonId());
rawDataProcess.print();
DataStream<POJO> cgmStream = rawDataProcess
.keyBy(new ReProcessorKeySelector())
.rebalance()
.flatMap(new SgStreamTask());
cgmStream.print();
DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData
.keyBy(new CGMKeySelector())
.countWindow(2, 1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();
DataStream<POJO> streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream
.keyBy(new CGMKeySelector())
.countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key
streamWithSgRoc.print();
DataStream<POJO> cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc
.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE)
.apply(new CGMExcursionProviderStream()); //the same person_id key
cgmExcursionStream.print();
cgmExcursionStream
.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC),
new CGMDataCollectorSchema(),
kafkaConnectProperties));
在您分享的内容中,我看不到任何可以解释吞吐量低的原因。但是既然您询问了异步i/o,我想知道平面图是否正在进行一些外部i/o。如果是这样,那就可以解释了。如果是这样的话,那么使用异步i/o应该会有很大帮助,前提是外部服务可以处理增加的负载。
我还想知道为什么并行度是10,这10个插槽有哪些可用资源。是否有足够的内核来保持一切运转?对于15个分区,有5个插槽分别处理两个分区,另外5个插槽分别处理一个分区。5、8和15是更明显的并行选择。(当然,如果每个槽也访问平面图中的外部服务,这也需要考虑在内。)
看到代码后更新:
你可以做几件简单的事情来加快速度。
要做的一件事是为群集提供更多资源。您可以将并行性保持原样,但通过将任务管理器放在具有更多内核的计算机上,为每个插槽提供更多内核以供使用。
但在这样做之前,请先看一下优化管道。重新平衡和keyBy都相当昂贵,并且您使用它们的次数超过了必要的程度。有一个键By,然后立即进行重新平衡是没有意义的,有两个keyBy的一个紧接着另一个也没有意义。
Rebalance对数据流进行往返重新分区。这通常是在改变并行度或需要克服数据不对称时进行的。几乎从不使用重新平衡,除非在需要更改并行度时隐式使用。
KeyBy 执行基于密钥的重新分区。如果一个键By跟随另一个键,则第二个键将撤消第一个键所做的任何操作。
keyBy和rebalance都需要序列化和反序列化每个事件,并通过网络堆栈发送它们。这是你只有在绝对必要的情况下才想做的事情。
修复这些再平衡/密钥管理问题将减少集群上的工作负载。如果这还不足以实现所需的吞吐量,那么为每个插槽提供更多的内核(以便管道的各个阶段可以并行运行)应该可以做到这一点。
问题内容: 在处理多个千兆字节文件时,我注意到了一些奇怪的事情:似乎使用文件通道从文件读取到分配有allocateDirect的重复使用的ByteBuffer对象中,比从MappedByteBuffer中读取要慢得多,实际上,它甚至比读取字节中的记录还要慢。使用常规读取调用的数组! 我期望它(几乎)与从mapedbytebuffers读取的速度一样快,因为我的ByteBuffer是使用alloca
在过去的几天里,当我使用Java6、Struts3框架和Tomcat7服务器对我的产品进行性能测试时,我一直面临着这个奇怪的问题
问题内容: 我在Java2D方面表现有些古怪。我知道sun.java2d.opengl VM参数可以为2D启用3D加速,但是即使使用该参数也有一些奇怪的问题。 这是我运行的测试结果: 在JComponent上绘制具有32x32像素图块的25x18地图, 图像1 = .bmp格式,图像2 = .png格式 没有-Dsun.java2d.opengl = true 使用.BMP图像1的120 FPS使
最近,我们将数据库从11g更新为19c。 在新数据库版本中测试应用程序时,我们遇到了一个特定视图的性能问题,该视图工作得非常好,但在19c中会导致性能问题。 在分析计划时,我们看到执行计划发生了巨大变化,这导致了19c中视图的性能非常差。 令人惊讶的是,其他观点的效果很好。 如果你能对这个问题有所了解,那就太好了。 谢谢你,JD
安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?
从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种