Spark(v2.4)程序功能:
问题获取:
-获取重置分区nifi-log-batry-0的偏移量2826180。
源代码:
package io.xyz.streaming
import org.apache.spark.sql.avro._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._
object readKafkaJson {
private val topic = "nifi-log-batch"
private val kafkaUrl = "http://<hostname>:9092"
private val chk = "/home/xyz/tmp/checkpoint"
private val outputFileLocation = "/home/xyz/abc/data"
private val sparkSchema = StructType(Array(
StructField("timestamp", StringType),
StructField("level", StringType),
StructField("thread", StringType),
StructField("class", StringType),
StructField("message", StringType),
StructField("updatedOn", StringType),
StructField("stackTrace", StringType)))
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// ===================Read Kafka data in JSON==================
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("startingOffsets", "latest")
.option("subscribe", topic)
.load()
val dfs1 = df
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), sparkSchema).alias("my_column"))
.select("my_column.*")
// ===================Write to console==================
dfs1
.writeStream
.format("console")
.start()
.awaitTermination()
}
}
控制台上的详细问题日志:
2019-04-10 01:12:58 INFO WriteToDataSourceV2Exec:54 - Start processing data source writer: org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057. The input RDD has 0 partitions.
2019-04-10 01:12:58 INFO SparkContext:54 - Starting job: start at readKafkaJson.scala:70
2019-04-10 01:12:58 INFO DAGScheduler:54 - Job 0 finished: start at readKafkaJson.scala:70, took 0.003870 s
2019-04-10 01:12:58 INFO WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057 is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
2019-04-10 01:12:58 INFO CodeGenerator:54 - Code generated in 41.952695 ms
+---------+-----+------+-----+-------+---------+----------+
|timestamp|level|thread|class|message|updatedOn|stackTrace|
+---------+-----+------+-----+-------+---------+----------+
+---------+-----+------+-----+-------+---------+----------+
2019-04-10 01:12:58 INFO WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057 committed.
2019-04-10 01:12:58 INFO SparkContext:54 - Starting job: start at readKafkaJson.scala:70
2019-04-10 01:12:58 INFO DAGScheduler:54 - Job 1 finished: start at readKafkaJson.scala:70, took 0.000104 s
2019-04-10 01:12:58 INFO CheckpointFileManager:54 - Writing atomically to file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/0 using temp file file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/.0.eb290a31-1965-40e7-9028-d18f2eea0627.tmp
2019-04-10 01:12:58 INFO CheckpointFileManager:54 - Renamed temp file file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/.0.eb290a31-1965-40e7-9028-d18f2eea0627.tmp to file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/0
2019-04-10 01:12:58 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "fb44fbef-5d05-4bb8-ae72-3327b98af261",
"runId" : "ececfe49-bbc6-4964-8798-78980cbec525",
"name" : null,
"timestamp" : "2019-04-10T06:12:56.414Z",
"batchId" : 0,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1324,
"getBatch" : 10,
"getEndOffset" : 1,
"queryPlanning" : 386,
"setOffsetRange" : 609,
"triggerExecution" : 2464,
"walCommit" : 55
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[nifi-log-batch]]",
"startOffset" : null,
"endOffset" : {
"nifi-log-batch" : {
"0" : 2826180
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@6ced6212"
}
}
2019-04-10 01:12:58 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "fb44fbef-5d05-4bb8-ae72-3327b98af261",
"runId" : "ececfe49-bbc6-4964-8798-78980cbec525",
"name" : null,
"timestamp" : "2019-04-10T06:12:58.935Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getEndOffset" : 1,
"setOffsetRange" : 11,
"triggerExecution" : 15
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[nifi-log-batch]]",
"startOffset" : {
"nifi-log-batch" : {
"0" : 2826180
}
},
"endOffset" : {
"nifi-log-batch" : {
"0" : 2826180
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@6ced6212"
}
}
2019-04-10 01:12:58 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
即使我在
pySpark
中运行了一个等价的代码,我也会面临同样的问题
请建议如何解决此问题。
Kafka:v2。1.0 cpl,合流
火花:2.4
通过以下命令提交作业:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --jars /home/xyz/Softwares/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar --class io.xyz.streaming.readKafkaJson --master local[*] /home/xyz/ScalaCode/target/SparkSchemaKafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar
询问者似乎已经找到了解决方案,以下是评论中的相关部分:
主要决议
这是Scala中的模式结构问题。更正模式后,问题得到解决。
次要话题
在Pyspark代码中,处理正在进行,但消息没有停止,即,我能够运行代码并能够将流数据写入JSON文件,但控制台消息中填充了上述重置偏移量
日志消息
Pyspark的问题实际上是,信息被打印出来,我禁用了
之后一切都很好。
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va
我正在使用spark结构化流媒体、合流开源Kafka集群开发spark流媒体应用程序,并在AWS EMR中运行spark job。我们至少有20个Kafka主题,以AVRO格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用Spark阅读所有20个主题(逗号分隔的主题值)。然后从生成的数据帧中过滤每个消息行,使用正确的Avro模式应用每个消息,并将生成的写入S3和Ca
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我正在用Kafka运行一个结构化流应用程序。我发现如果系统因为某种原因停机几天...检查点变得过时,并且在Kafka中找不到与该检查点相对应的偏移量。我如何让Spark结构化流应用程序选择最后一个可用的偏移量,并从那里开始。我尝试将偏移量重置设置为“早期/最新”,但系统崩溃,出现以下错误: