当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流式多线程/多消费者

宗政松
2023-03-14

我正在使用spark结构化流媒体、合流开源Kafka集群开发spark流媒体应用程序,并在AWS EMR中运行spark job。我们至少有20个Kafka主题,以AVRO格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用SparkReadStream阅读所有20个主题(逗号分隔的主题值)。然后从生成的数据帧中过滤每个消息行,使用正确的Avro模式应用每个消息,并将生成的数据集[T]写入S3和Cassandra。

一些我找不到答案的问题

>

火花应用程序如何在分区之间分配处理?火花是否使用不同的执行器并行读取这些主题/分区,或者我是否需要为每个分区实现任何多线程

是否可以扩展到一个消费者组中的多个消费者以并行化?

抱歉问了这么多问题,我想这些问题都是相关的。感谢您的任何反馈或指点,我可以在那里找到文档。

>

  • MyConfig

    val kafkaParams=  Map("kafka.bootstrap.servers" -> "topic1,topic2,topic3,topic4,topic5,
        "failOnDataLoss" -> param.fail_on_data_loss.toString,
        "subscribe" -> param.topics.toString,
        "startingOffsets" -> param.starting_offsets.toString,
        "kafka.security.protocol" -> param.kafka_security_protocol.toString,
        "kafka.ssl.truststore.location" -> param.kafka_ssl_truststore_location.toString,
        "kafka.ssl.truststore.password" -> param.kafka_ssl_truststore_password.toString
      )
    

    ReadStream代码

    val df = sparkSession.readStream
      .format("kafka")
      .options(kafkaParams)
      .load()
    

    然后使用“主题”列将输入数据帧拆分为多个数据帧,并为每个生成的数据帧应用Avro模式。

    将每个Dataset[T]写入不同的接收器,如S3、Cassandra等...

  • 共有1个答案

    齐振
    2023-03-14

    我可以为所有主题使用一个ReadStream吗?

    假设所有主题都可以使用同一组Kafka配置,那么当然。但可能不具备容错能力。例如,failOnDataLoss将导致整个作业在单个主题失败时失败。

    它会被认为是所有主题和分区的一个火花消费者吗。。。spark是否使用不同的执行器并行读取这些主题/分区?

    是的。您可以将Spark执行器的数量扩展到所有主题的分区总和。

    我需要为每个分区实现任何多线程吗?

    Spark应该帮你解决这个问题。

    是否可以扩展到一个消费者组中的多个消费者以并行化?

    你应该试着设置一个组。id属性,但有多个执行者已经在创建一个消费者组。

    与这个问题无关——你想做的已经是Kafka连接的目的了。将Kafka数据读入各种数据源。S3和Cassandra已经是两个现有的插件实现。

     类似资料:
    • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

    • 关于破坏者,我有以下问题: 消费者(事件处理器)没有实现他们实现EventHandler的任何可调用或可运行接口,那么他们如何能够并行运行,因此,例如,我有一个disruptor实现,其中有这样一个菱形模式 其中c1到c3可以在p1之后并联工作,C4和C5在p1之后工作。 所以通常我会有这样的东西(P1和C1-C5是可运行/可调用的) 但是在Disruptor的情况下,我的事件处理程序都没有实现R

    • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,

    • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

    • 我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。 但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容- > 实际上,我可以在部署池中定义多个实