val kafka_bootstrap_servers = "localhost:9092"
val users_df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("subscribe", kafka_topic_name)
.load()
val users_df_1 = users_df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
val user_schema = StructType(
List(
StructField("RecordNumber", IntegerType, true),
StructField("Zipcode", StringType, true),
StructField("ZipCodeType", StringType, true),
StructField("City", StringType, true),
StructField("State", StringType, true),
StructField("LocationType", StringType, true),
StructField("Lat", StringType, true),
StructField("Long", StringType, true),
StructField("Xaxis", StringType, true),
StructField("Yaxis", StringType, true),
StructField("Zaxis", StringType, true),
StructField("WorldRegion", StringType, true),
StructField("Country", StringType, true),
StructField("LocationText", StringType, true),
StructField("Location", StringType, true),
StructField("Decommisioned", StringType, true)
)
)
val users_df_2 = users_df_1.select(from_json(col("RecordNumber"), user_schema)
.as("user_detail"), col("Zipcode"))
val users_df_3 = users_df_2.select(col = "user_detail.*", "Zipcode")
users_df_3.printSchema()
users_df_3.show(numRows = 10, truncate = false)
spark.stop()
println("Apache spark application completed.")
}
}
下面的json数据示例
{"RecordNumber":76511,"Zipcode":27007,"ZipCodeType":"STANDARD","City":"ASH HILL","State":"NC","LocationType":"NOT ACCEPTABLE","Lat":36.4,"Long":-80.56,"Xaxis":0.13,"Yaxis":-0.79,"Zaxis":0.59,"WorldRegion":"NA","Country":"US","LocationText":"Ash Hill, NC","Location":"NA-US-NC-ASH HILL","Decommisioned":false,"TaxReturnsFiled":842,"EstimatedPopulation":1666,"TotalWages":28876493}
下面的错误消息
线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasource.scala:652),在org.apache.spark.sql.dataframereader.load(dataframereader.scala:194),在org.apache.spark.sql.dataframereader.load(dataframereader.scala:167),在streamingapp$.main(streamingapp.scala)
需要帮助阅读Kafka主题的数据。
请遵循spark streaming+kafka集成指南
https://spark.apache.org/docs/latest/structure-streaming-kafka-integration.html
您可能丢失了工件“spark-sql-kafka-0-102.12”
在我们的docker-swarm中运行kafka connect,使用以下撰写文件: kafka connect节点成功启动,我可以设置任务并查看这些任务的状态······ 我是否在撰写文件或任务配置中缺少某些配置?
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht
嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写
有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取? 即使主题是在流应用程序已经运行时创建的? 类似于在主题名称中使用通配符,如下所示: 现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。 虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所
我想读取事务的元数据(在Kafka0.11.0.1中支持),这样我就可以确定特定事务ID的事务是否已经提交。目前,我正在从_transactional_state主题获取键和值,但它是某种编码格式。以下是我在轮询__transaction_state主题时收到的一些相同的键/值:键=10000000MMM,值=+')