当前位置: 首页 > 知识库问答 >
问题:

Flink批连接性能

寇景明
2023-03-14

我一直在批处理模式下测试使用TableApi和DataStream api的简单联接。然而,我得到了非常糟糕的结果,所以它一定是我做错了什么。用于联接的数据集约为 900gb 和 3gb。用于测试的环境是具有 10 * m5.xlarge 工作节点的 EMR。

使用的TableApi方法是在数据s3路径上创建一个表,并在目标s3路径上对创建的表执行插入语句。通过调整任务管理器内存、numberOfTaskSlot、并行性,但无法使其在某种程度上可以接受的时间内执行(至少1.5h)。

在批处理模式下使用DataStreamApi时,我总是会遇到这样一个问题:由于Thread占用了超过90%的磁盘空间,因此它会终止任务。所以我不知道这是因为代码,还是仅仅因为flink需要比spark更多的磁盘空间。读取数据流:

    val sourceStream: DataStream[SourceObj] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "someSourceName")
      .map(x => SourceObj.convertFromString(x))

加入:

    val joinedStream = sourceStream.join(sourceStream2)
      .where(col1 => sourceStream.col1)
      .equalTo(col2 => sourceStream2.col2)
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(1))
      .apply{
        (s, c) => JoinedObj(c.col1, s.col2, s.col3)
      }

是我缺少某些内容,还是只需要纵向扩展群集?

共有2个答案

顾均
2023-03-14

当您在批处理模式下使用DataStream API时,它会在所有随机/加入/减少操作中广泛使用托管内存。此外,如最后一段所述,Flink将在加入期间将无法放入内存的所有数据溢出到磁盘。

因此,我认为这可能是磁盘空间不足问题的原因。我的工作也遇到了同样的问题。

方飞翼
2023-03-14

一般来说,您最好使用Flink的Table/SQLAPI实现关系工作负载,这样它的优化器就有机会提供帮助。

但是如果我没有理解错的话,这个特殊的连接执行起来会非常昂贵,因为没有任何东西会从状态中过期。这两个表都将在Flink中完全具体化,因为对于这个查询,每一行输入都是相关的,并可能影响结果。

如果您可以将它转换为具有时间约束的某种连接,优化程序可以使用该约束来释放不再有用的行,那么它的行为将会好得多。

 类似资料:
  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。

  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-star

  • 为什么AWS SQS不是Apache Flink的默认连接器?这样做有技术限制吗?还是只是一些没有完成的事情?我想实现这一点,任何指点都将不胜感激

  • 我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,