当前位置: 首页 > 知识库问答 >
问题:

使用多个Kafka流进行Spark Streaming

李飞翼
2023-03-14

我使用以下代码创建kafka流:

val streams = (1 to 5) map {i => 
    KafkaUtils.createStream[....](
              streamingContext,
              Map( .... ),
              Map(topic -> numOfPartitions),
              StorageLevel.MEMORY_AND_DISK_SER
              ).filter(...)
              .mapPartitions(...)
              .reduceByKey(....)
val unifiedStream = streamingContext.union(streams)
unifiedStream.foreachRDD(...)
streamingContext.start()

我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。

我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?

共有1个答案

郑嘉年
2023-03-14

您想要尝试设置参数

SparkConf().set("spark.streaming.concurrentJobs", "5")
 类似资料:
  • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

  • 在文件中,我将作为默认值Serde,然后使用使用字符串值。 当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。 以下是发布topicTwo和TopicTrey的例外:

  • 配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日

  • 我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。 错误 流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。 问题 此错误意味着什么,以及导致此错误的原因。 假设您可以有应用程序的多个实

  • 我正在使用标记为我的搜索框创建一个建议列表,但是我不能从数据列表中选择多个值。当前,我得HTML是:

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach