我使用以下代码创建kafka流:
val streams = (1 to 5) map {i =>
KafkaUtils.createStream[....](
streamingContext,
Map( .... ),
Map(topic -> numOfPartitions),
StorageLevel.MEMORY_AND_DISK_SER
).filter(...)
.mapPartitions(...)
.reduceByKey(....)
val unifiedStream = streamingContext.union(streams)
unifiedStream.foreachRDD(...)
streamingContext.start()
我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。
我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?
您想要尝试设置参数
SparkConf().set("spark.streaming.concurrentJobs", "5")
我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚
在文件中,我将作为默认值Serde,然后使用使用字符串值。 当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。 以下是发布topicTwo和TopicTrey的例外:
配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日
我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。 错误 流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。 问题 此错误意味着什么,以及导致此错误的原因。 假设您可以有应用程序的多个实
我正在使用标记为我的搜索框创建一个建议列表,但是我不能从数据列表中选择多个值。当前,我得HTML是:
我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach