当前位置: 首页 > 知识库问答 >
问题:

使用Python集成Apache Kafka和Apache Spark Streaming

宗安宁
2023-03-14

我正在尝试使用Python将Apache Kafka与Apache spark streaming集成(我对所有这些都是新手)。

    null

代码是

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

./spark-submit/root/girish/python/kafkawordcount.py本地主机:2181

我得到了这个错误

Traceback (most recent call last):
  File "/root/girish/python/kafkawordcount.py", line 28, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 72, in createStream
    raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
 ./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar  /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
    jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'

PS:我使用的是Apache Spark 1.2

共有1个答案

井宪
2023-03-14

面对同样的问题,通过添加kafka-assembly包来解决

bin/spark-submit  --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 ~/py/sparkjob.py

根据你的spark和kafka版本使用。

 类似资料:
  • 问题内容: 我试图安装有,但我在损失的如何文件需要被写入。 当安装过程之后安装完成且没有错误,但没有关于增加从1(由环境变量控制)使用OpenBLAS线程数性能下降。 我不确定OpenBLAS集成是否完善。任何人都可以提供文件来实现相同目的。 PS:OpenBLAS与基于的等其他工具包的集成,可在同一台计算机上显着提高线程数量,从而显着提高性能。 问题答案: 我只是在带有集成的a内编译,看来工作正

  • 我引用了这个git项目来使用配置单元表集成cassandra数据,我将适当的cassandra JAR复制到配置单元库文件夹中,但是在运行对cassandra的查询时,我得到了以下错误,请帮助我解决它。 https://github.com/milliondreams/hive/tree/cas-support-cql/cassandra-handler HIVE>创建外部表消息(row_key字

  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 问题内容: 我有一个Java应用程序,需要与第三方库集成。该库是用Python编写的,对此我没有任何发言权。我正在尝试找出与之集成的最佳方法。我正在尝试JEPP(Java嵌入式Python)-以前有人使用过吗?我的另一个想法是使用JNI与Python的C绑定进行通信。 任何有关最佳方法的想法都将不胜感激。谢谢。 问题答案: 为什么不使用Jython?我唯一能想到的缺点就是你的库是否使用CPytho

  • 我已经在jenkins和ZAPI安装了Zephyr插件,在jira安装了Zephyr插件,并且能够建立连接 但是在下拉列表中我无法查看jira项目 有人能帮我解决这个问题吗?在构建后无法在下拉列表中查看项目名称

  • 本文向大家介绍Python集成开发工具Pycharm的安装和使用详解,包括了Python集成开发工具Pycharm的安装和使用详解的使用技巧和注意事项,需要的朋友参考一下 Python语言当前越来越流行,使用的人越来越多,集成开发工具pycharm是当前使用比较多的一个开发工具,掌握pycharm的安装和基本的使用非常重要。 (1)pycharm的下载。 进入到pycharm的官网:https:/

  • 问题内容: 是否可以集成Python和JavaScript?例如,假设您希望能够在JavaScript中定义类并从Python使用它们(反之亦然)。如果是这样,最好的方法是什么?我不仅对这是否可行而且对是否 有人在“严肃的”项目或产品中做到了 感兴趣。 我想举个例子,可以使用Jython和Rhino,但我很好奇是否有人真正做到了这一点,以及是否有针对其他平台的解决方案(尤其是CPython)。 问

  • 问题内容: 集成erlang和python的最佳方法是什么? 我们需要在erlang中调用python函数,并在python中调用erlang函数。目前,我们正在尝试将SOAP用作这两种语言之间的中间层,但是我们有很多“不兼容”的麻烦。您能否建议执行集成的最佳方法? 问题答案: 如erlport所述,您可以在Erlang端使用Erlang端口协议和term_to_binary / binary_t