当前位置: 首页 > 知识库问答 >
问题:

在Spark中,无法使用来自Kafka主题的数据

慕翰学
2023-03-14
name := "Sample_streaming"
     
version := "0.1"
     
scalaVersion := "2.11.12"
     
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % 2.3.2"
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.0.0" % Test
libraryDependencies += "com.github.harbby" % "spark-sql-kafka-0-8" % "1.0.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.16" % Test
libraryDependencies += "commons-logging" % "commons-logging" % "1.2"
dependencyOverrides ++= {
  Seq(
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
    "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
  )
}
package com.spark.learning
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
     
object kafkar {
    def main(args: Array[String]) {
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("akka").setLevel(Level.OFF)

        println("program started")

        val conf = new SparkConf().setMaster("spark://192.1xx.x.xxx:7077").setAppName("kafkar")
        println("master set")
        val ssc = new StreamingContext(conf, Seconds(2))
        println("Streaming Context set")

        val kafkaStream = KafkaUtils.createStream(ssc, "192.xxx.x.xxx:2181", "test-consumer-group", Map("demo1" -> 5))
        println("Kafka stream set")

        kafkaStream.print()
        println("Kafka stream printed")
        ssc.start
        ssc.awaitTermination()
    }
}

当我在SBT之上运行时,我会得到一些异常/错误:

[root@hadoop-single Sample_streaming]# sbt compile
   [info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_252)
    [info] loading project definition from /root/Sample_streaming/project
    [info] loading settings for project sample_streaming from build.sbt ...
    [info] set current project to Sample_streaming (in build file:/root/Sample_streaming/)
    [info] running com.spark.learning.kafkar 
    program started
    master set
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/07/16 06:00:48 INFO SparkContext: Running Spark version 2.3.2
    20/07/16 06:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    20/07/16 06:00:54 INFO SecurityManager: Changing modify acls groups to: 
    20/07/16 06:00:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
    20/07/16 06:00:57 INFO Utils: Successfully started service 'sparkDriver' on port 46733.
    20/07/16 06:00:57 INFO SparkEnv: Registering MapOutputTracker
    20/07/16 06:00:57 INFO SparkEnv: Registering BlockManagerMaster
    20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    20/07/16 06:00:57 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-xxxxx-xxxx-xxxx
    20/07/16 06:00:57 INFO MemoryStore: MemoryStore started with capacity 415.5 MB
    20/07/16 06:00:58 INFO SparkEnv: Registering OutputCommitCoordinator
    20/07/16 06:00:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    20/07/16 06:00:59 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hadoop-single.accesshost.internal:4040
    20/07/16 06:01:01 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.1xx.x.xx:7077...
    20/07/16 06:01:01 INFO TransportClientFactory: Successfully created connection to /192.1xx.x.xx:7077 after 401 ms (0 ms spent in bootstraps)
    20/07/16 06:01:02 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200716060102-0010
    20/07/16 06:01:02 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44965.
    20/07/16 06:01:02 INFO NettyBlockTransferService: Server created on hadoop-single.accesshost.internal:44965
    20/07/16 06:01:02 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    20/07/16 06:01:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManagerMasterEndpoint: Registering block manager hadoop-single.accesshost.internal:44965 with 415.5 MB RAM, BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:04 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
    Streaming Context set
    [error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/Logging
    [error] java.lang.NoClassDefFoundError: org/apache/spark/Logging
    [error]     at java.lang.ClassLoader.defineClass1(Native Method)
    [error]     at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    [error]     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    [error]     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    [error]     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    [error]     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    [error]     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    [error]     at java.security.AccessController.doPrivileged(Native Method)
    [error]     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    [error] Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    [error]     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    [error] stack trace is suppressed; run last Compile / bgRun for the full output
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
    org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:181)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
        at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-executorManagement, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-executorManagement
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
     org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
    20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
    20/07/16 06:01:05 INFO SparkUI: Stopped Spark web UI at http://hadoop-single.accesshost.internal:4040
    20/07/16 06:01:05 INFO StandaloneSchedulerBackend: Shutting down all executors
    [error] Nonzero exit code: 1
    [error] (Compile / run) Nonzero exit code: 1
    [error] Total time: 41 s, completed Jul 16, 2020 6:01:06 AM
    20/07/16 06:01:06 INFO DiskBlockManager: Shutdown hook called
    20/07/16 06:01:06 INFO ShutdownHookManager: Shutdown hook called
    20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx
    20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx/userFiles-xxxx

共有1个答案

季嘉良
2023-03-14

看起来您使用了很少多余的依赖项,也很少依赖项是旧的。请使用下面的代码消费Kafka主题的数据

依赖关系

private lazy val sparkStreamingVersion = "2.3.4"   
private lazy val kafkaVersion = "2.3.0"
private lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

private lazy val sparkStreaming = "org.apache.spark" %% "spark-streaming" % sparkStreamingVersion
  private lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % kafkaVersion excludeAll excludeJpountz

 libraryDependencies ++= Seq(
    sparkStreaming,
    sparkStreamingKafka
  )

下面应该工作很好。

def main(args: Array[String]): Unit ={

    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming with Kafka")
    // Streaming Context with batch interval = 10 seconds
    val streamingContext = new StreamingContext(conf, Seconds(10))

    val bootStrapServers = "bootstrap_server_names"

    // Kafka Parameters
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootStrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic_name")
    
    // Direct Kafka Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map{record => (record.topic(), record.value(), record.partition(), record.offset())}.print()

    streamingContext.start() // Start the computation
    streamingContext.awaitTermination() // Wait for the computation to terminate
  }
 1. PreferConsistent - This will distribute partitions evenly across available 
     executors. 
 2. PreferBrokers - If your executors are on the same hosts as your Kafka 
     brokers, use PreferBrokers, which will prefer to schedule partitions on the 
     Kafka leader for that partition. 
 3. PreferFixed - if you have a significant skew in load among partitions, 
     use PreferFixed. This allows you to specify an explicit mapping of partitions 
     to hosts (any unspecified partitions will use a consistent location).
 1. Subscribe - Allows you to subscribe to a fixed collection of topics.
 2. SubscribePattern - Allows you to use Regex to specify topics of interest.
 3. Allign - Allows you to specify a fixed collection of partitions.  
 类似资料:
  • 我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我已经把Kafka和博士后的成绩记录下来了。我使用JDBC接收器连接器将数据从Kafka主题加载到Postgres表。首先,我用“AVRO”值格式创建一个主题和一个主题上方的流。 以下是创建接收器连接器的代码: 然后,我使用命令检查Postgres是否有来自Kafka的数据,它返回以下信息:

  • 2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试

  • 我对Kafka还很陌生。我已经用Java创建了一个示例生产者和消费者。使用producer,我可以将数据发送到kafka主题,但我无法使用以下消费者代码获取主题中的记录数。

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出: