var df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "cust*")
.option("startingOffsets", "earliest")
.load()
.withColumn("value", $"value".cast("string"))
.filter($"value".isNotNull)
上面的流式查询将来自这两个主题的数据流化。
假设我有两个Spark模式,每个主题一个:
var cust: StructType = new StructType()
.add("name", StringType)
.add("age", IntegerType)
var customers: StructType = new StructType()
.add("id", IntegerType)
.add("first_name", StringType)
.add("last_name", StringType)
.add("email", StringType)
.add("address", StringType)
现在,我想应用基于主题名称的Spark模式,为此,我编写了一个udf,它读取主题名称并以DDL格式返回模式:
val schema = udf((table: String) => (table) match {
case ("cust") => cust.toDDL
case ("customers") => customers.toDDL
case _ => new StructType().toDDL
})
val query = df
.withColumn("topic", $"topic".cast("string"))
.withColumn("data", from_json($"value", schema($"topic")))
.select($"key", $"topic", $"data.*")
.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);
你所做的是不可能的。您的查询
df不能有两个不同的架构。
我可以想出两种方法来做这件事:
DF
,然后将2个架构应用于2个DF(CUST
和Customers
)我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。
我有这样的拓扑: 拓扑中最后提到的源是每个应用程序实例的特定主题。我希望该主题仅由该实例处理。此主题的数据由前一个处理器推送,基于哪个实例必须处理该消息。 但是一旦流启动,它会尝试将实例特定的主题分区也分配给其他实例。我们可以在Kafka流中实现这个要求吗? 我希望一个主题仅由特定实例处理。
我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,
当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。
我有一个真正的env与3个kafka机器集群,它正在接收大量数据。对于每个主题,有 25 个分区,复制因子设置为 2。 我的应用程序(基于kafka流的应用程序)从这个kafka集群获取数据的应用程序停机了一个多月。现在,每个分区都有大量滞后;高达90000000。 我知道以下参数: 我有两个消费者节点(使用kafka集群数据的相同组id)。 然而,它并没有赶上滞后,而是保持不变。有人能建议如何改
我正在使用Maven开发独立应用程序。我尝试了以下场景:1。它使用maven shade插件解包所有的依赖项,这样我的签名JAR(bcprov-jdk16-1.46.jar)就被解包了。因此丢失了JAR的标志,因为am收到安全异常“JCE无法验证提供者BC”。2.使用maven assembly插件(基于assembly.xml)和maven jar插件(为所有jar设置类路径),实现了将所有依赖