当前位置: 首页 > 知识库问答 >
问题:

如何将数据从Kafka传递到Spark Streaming?

凌经赋
2023-03-14

我正试图将数据从Kafka传递到火花流。

这就是我到现在所做的:

    null
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")
    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/18 13:05:33 INFO SparkContext: Running Spark version 1.6.0
16/01/18 13:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/18 13:05:33 INFO SecurityManager: Changing view acls to: username
16/01/18 13:05:33 INFO SecurityManager: Changing modify acls to: username
16/01/18 13:05:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); users with modify permissions: Set(username)
16/01/18 13:05:33 INFO Utils: Successfully started service 'sparkDriver' on port 54446.
16/01/18 13:05:34 INFO Slf4jLogger: Slf4jLogger started
16/01/18 13:05:34 INFO Remoting: Starting remoting
16/01/18 13:05:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@127.0.0.1:50386]
16/01/18 13:05:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50386.
16/01/18 13:05:34 INFO SparkEnv: Registering MapOutputTracker
16/01/18 13:05:34 INFO SparkEnv: Registering BlockManagerMaster
16/01/18 13:05:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f5490271-cdb7-467d-a915-4f5ccab57f0e
16/01/18 13:05:34 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/01/18 13:05:34 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/18 13:05:34 INFO Server: jetty-8.y.z-SNAPSHOT
16/01/18 13:05:34 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/01/18 13:05:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/18 13:05:34 INFO SparkUI: Started SparkUI at http://127.0.0.1:4040
Java HotSpot(TM) Server VM warning: You have loaded library /tmp/libnetty-transport-native-epoll561240765619860252.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
16/01/18 13:05:34 INFO Utils: Copying ~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py to /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/userFiles-e93fc252-0ba1-42b7-b4fa-2e46f3a0601e/kafka-spark.py
16/01/18 13:05:34 INFO SparkContext: Added file file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py at file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py with timestamp 1453118734892
16/01/18 13:05:35 INFO Executor: Starting executor ID driver on host localhost
16/01/18 13:05:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58970.
16/01/18 13:05:35 INFO NettyBlockTransferService: Server created on 58970
16/01/18 13:05:35 INFO BlockManagerMaster: Trying to register BlockManager
16/01/18 13:05:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58970 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58970)
16/01/18 13:05:35 INFO BlockManagerMaster: Registered BlockManager

________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6.0 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.6.0.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py", line 33, in <module>
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1)
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 80, in createStream
py4j.protocol.Py4JJavaError: An error occurred while calling o22.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

16/01/18 13:05:35 INFO SparkContext: Invoking stop() from shutdown hook
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}
16/01/18 13:05:35 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
16/01/18 13:05:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/18 13:05:35 INFO MemoryStore: MemoryStore cleared
16/01/18 13:05:35 INFO BlockManager: BlockManager stopped
16/01/18 13:05:35 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/18 13:05:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/01/18 13:05:35 INFO SparkContext: Successfully stopped SparkContext
16/01/18 13:05:35 INFO ShutdownHookManager: Shutdown hook called
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/pyspark-fcd47a97-57ef-46c3-bb16-357632580334
kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fd6c4dad150>

共有1个答案

薛征
2023-03-14

您需要在作业中提交spark-streaming-kafka-assembly_*.jar:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar ./spark-kafka.py 
 类似资料:
  • 我创建了一个新的电子应用程序。 在索引中。jsi使用节点文件系统加载数据 如果我试着使用require。JSIT之所以能够工作,是因为它运行在不同的线程上,而不是使用节点进行初始化,更像是一个实际的浏览器窗口。但有没有办法从索引中传递数据呢。js到main。js 我不知道我对这个问题的看法是否部分正确 如果您需要更多代码或信息,请询问!

  • 问题内容: 我需要在单击recyclerview的图像时将数据从传递 到 。有人可以帮忙吗? 问题答案: 创建一个侦听器接口,然后让您的MainActivity实现它。这样,您可以在onClick方法中调用回调方法。 接口: 主要活动: 您的VideoAdapter:

  • 问题内容: 我有一个一个的按钮在我的课,这是我的主要窗口类: 我有一个类就是主窗口的内容。 如何访问内容中的此按钮?有没有更好的方法来组织和控制器以促进此访问? 问题答案: 这样使用委托怎么样?本示例将更改按钮的标题。

  • 我正在尝试从我的FoodAdapter中传递数据,该数据从BaseAdapter扩展到从AppCompatDialog片段扩展的FoodDialog。当用户单击list_item中的按钮时,我需要将数据传递给FoodDialog。 确认FoodDialog。班

  • 在控制器中(也可能是Main或另一个文件,我不确定这一切是如何工作的),我们有以下内容: 在Scene Bilder生成的FXML中,如下所示:

  • 问题内容: 我正在尝试使用Django和D3.js编写非常基本的条形图。我有一个名为play的对象,其中datetime字段称为date。我想做的是显示按月分组的播放次数。基本上我有两个问题: 我如何按月份将这些分组,并计算当月的播放次数 将这些信息从Django转换为D3可用的最佳方法是什么。 现在,我在这里查看了其他答案,并尝试了 这接近于我想要的信息,但是当我尝试将其输出到模板中时,它在月末