当前位置: 首页 > 知识库问答 >
问题:

运行python的apache spark流示例

龙华翰
2023-03-14

我试图运行示例目录中给出的python spark流作业-

https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html

"""
 Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 Usage: kafka_wordcount.py <zk> <topic>
 To run this on your local machine, you need to setup Kafka and create a producer first, see
 http://kafka.apache.org/documentation.html#quickstart

 and then run the example
    `$ bin/spark-submit --jars \
      external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
      examples/src/main/python/streaming/kafka_wordcount.py \
      localhost:2181 test`
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    # counts.pprint()

    ssc.start()
    ssc.awaitTermination()
bin/spark-submit --jars ../external/spark-streaming-kafka*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
Exception in thread "Thread-3" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition

共有1个答案

越文康
2023-03-14

您需要使用spark-streaming-kafka-assemblyjar,而不是spark-streaming-kafka。assembly jar具有所有的依赖项(包括kafka客户端)。

 类似资料:
  • 我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。

  • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我已经设置了我的第一个spark集群(1个master,2个Worker)和一个iPython笔记本服务器,我已经设置了它来访问该集群。我正在运行Anaconda的工人,以确保每个盒子上的python设置都是正确的。iPy笔记本服务器似乎已经正确设置了所有内容,并且我能够初始化Spark并推出一个作业。然而,这项工作正在失败,我不确定如何排除故障。代码如下: 这里有个错误: Py4JJavaErr

  • 您可以在Pivotal Web Services中找到部署的运行示例。在以下链接中查看它们: Zipkin表示样品中的应用程序到顶部 Zipkin为啤酒厂在PWS,其Github代码

  • 11.4 运行示例 此时我们的程序应该能够运行。由于使用了spring-boot-starter-parentPOM,因此我们有一个有用的目标run,可以使用它来启动应用程序。在项目的根目录键入mvn spring-boot:run来启动程序: $ mvn spring-boot:run . ____ _ __ _ _ /\\ / ___'_ _