当前位置: 首页 > 知识库问答 >
问题:

从Kafka读取时如何异步制作火花流

通迪
2023-03-14

我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可以专注于火花内部缓慢的数据处理。顺便说一句,我正在使用foreachRDD函数。

共有1个答案

胡鸿远
2023-03-14

您可以增加kafka中的分区数量,它应该可以提高并行性,您也可以尝试使用“Direct kafka接收器”,这确实可以提高您的应用程序从kafka读取时的性能

 类似资料:
  • 请帮助理解为什么不提取为8:15am? W3C日期和时间格式 示例1994-11-05T08:15:30-05:00对应于美国东部标准时间1994年11月5日上午8:15:30。 用于格式化和解析的日期时间模式

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 问题内容: 当有字符可用时,是否有一种优雅的方法来触发事件?我想避免投票。 问题答案: 您将必须创建一个单独的线程以阻止读取,直到有可用的线程为止。 如果您不想实际消耗输入,则必须用内部缓冲区包装它,读入缓冲区,然后喊叫,并在要求输入时从缓冲区返回数据。 您可以这样解决:

  • 试图读取一个空的镶木地板目录,得到这个错误 无法指定拼花地板的架构。必须手动指定 我的代码 尝试处理scala尝试和定期检查空目录 任何想法

  • 我正在运行以下scala代码: 我知道firstStruct是structType,StructFields的一个名称是“name”,但在尝试强制转换时似乎失败了。我被告知spark/hive结构与scala不同,但为了使用structType,我需要 所以我想他们应该是同一种类型的。 我看了看这里:https://github.com/apache/spark/blob/master/sql/c