我试图运行示例目录中给出的python spark流作业-
https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html
"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: kafka_wordcount.py <zk> <topic>
To run this on your local machine, you need to setup Kafka and create a producer first, see
http://kafka.apache.org/documentation.html#quickstart
and then run the example
`$ bin/spark-submit --jars \
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
# counts.pprint()
ssc.start()
ssc.awaitTermination()
bin/spark-submit --jars ../external/spark-streaming-kafka*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
Exception in thread "Thread-3" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
您需要使用spark-streaming-kafka-assembly
jar,而不是spark-streaming-kafka
。assembly jar具有所有的依赖项(包括kafka客户端)。
我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。
我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus
我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里
我已经设置了我的第一个spark集群(1个master,2个Worker)和一个iPython笔记本服务器,我已经设置了它来访问该集群。我正在运行Anaconda的工人,以确保每个盒子上的python设置都是正确的。iPy笔记本服务器似乎已经正确设置了所有内容,并且我能够初始化Spark并推出一个作业。然而,这项工作正在失败,我不确定如何排除故障。代码如下: 这里有个错误: Py4JJavaErr
您可以在Pivotal Web Services中找到部署的运行示例。在以下链接中查看它们: Zipkin表示样品中的应用程序到顶部 Zipkin为啤酒厂在PWS,其Github代码
11.4 运行示例 此时我们的程序应该能够运行。由于使用了spring-boot-starter-parentPOM,因此我们有一个有用的目标run,可以使用它来启动应用程序。在项目的根目录键入mvn spring-boot:run来启动程序: $ mvn spring-boot:run . ____ _ __ _ _ /\\ / ___'_ _