当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Streaming DirectAPI中从每个Kafka分区并发读取

汝跃
2023-03-14

如果我是正确的,默认情况下,spark streaming 1.6.1使用单线程从每个Kafka分区读取数据,假设我的Kafka主题分区是50,这意味着每50个分区中的消息将按顺序读取,或者可能以循环方式读取。

案例1:

-如果是,那么我如何在分区级别并行化读取操作?创建多个< code > kafkautils . createdirectstream 是唯一的解决方案吗?

e.g.
      val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet).map(_._2)

      val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet).map(_._2)

案例2:

-如果我的kafka分区每秒接收5条消息,那么在这种情况下,< code > "-conf spark . streaming . Kafka . maxratepartition = 3 " 和< code > "-conf spark . streaming . block interval " 属性是如何出现的?

共有2个答案

邢令
2023-03-14

在情况二:

spark.streaming.blockInterval

只有冲击接收器,您可以看到文档:

在将 Spark 流式处理接收器存储在 Spark 中之前,将这些数据块分块到数据块的时间间隔。

spark.streaming.kafka.maxRatePerPartition = 3 < 5(you say)

总延迟会增加,你可以看到这一点

http://spark . Apache . org/docs/latest/streaming-programming-guide . html # setting-the-right-batch-interval

公子昂
2023-03-14

在直接模型中:

  • 按顺序访问每个分区
  • 并行访问不同的分区

在第二种情况下,它取决于时间间隔,但通常如果maxRatePerPartition低于每秒实际速率乘以批处理窗口,您将始终滞后。

 类似资料:
  • 我在数据流中使用“keyby”。我要flink发现每个键的所有kafka分区。我有30个分区

  • 我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下

  • 问题内容: 使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取。我尝试使用GlobalKTable实现此目的,但问题是数据将被覆盖,也无法对其应用聚合。 假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据

  • 我设置了一个Spring集成流程来处理一个有3个分区的主题,并将侦听器容器的并发性设置为3。正如所料,我看到三个线程处理来自所有3个分区的批处理。然而,我发现在某些情况下,一个侦听器线程可能处理包含来自多个分区的消息的单个批处理。在kafka中,我的数据是按id划分的,因此它可以与其他id同时处理,但不能在另一个线程上与相同的id一起处理(我很惊讶地发现这种情况正在发生)。通过阅读文档,我认为每个

  • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#