当前位置: 首页 > 知识库问答 >
问题:

结构化流Kafka 2.1->Zeppelin 0.8->spark 2.4:spark不使用jar

班宏毅
2023-03-14

我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。

我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dependencies-artifact”添加到Zeppelin的“Spark”解释器(也处理%pyspark段落)。我重新启动了这个解释器(还有齐柏林飞艇)。

我还在第一个笔记本的一段加载了jar(我首先认为这应该是没有必要的……):

%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

所以,我没有错误,所以加载似乎可以工作。现在,我要做的测试,Kafka服务器运行在同一台机器上使用这个端口,还有一个主题“测试”:

%pyspark
# Subscribe to a topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

但我发现了错误

无法执行第6行:.option(“subscribe”,“test”)\Traceback(最近一次调用):文件“/usr/local/analython/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第63行,deco return f(*a,**kw)文件“/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第328行,get_return_value格式(target_id,“.”,nameMethodAccessorImpl.java:43)在java.lang.Reflect.Method.Invoke(method.java:498)在PY4J.Reflection.MethodInvoker.Invoke(methodInvoker.java:244)在PY4J.Reflection.ReflectionEngine.Invoke(methodInvoker.java:357)在PY4J.Gateway.Invoke(methodInvoker.java:357)在PY4J.Gateway.Invoke(gateway.java:282)在

在处理上述异常时,又发生了一个异常:

回溯(最近一次调用):文件“/tmp/zeppelin_pyspark-312826888257172599.py”,第380行,在exec(代码,_zcUserQueryNamespace)文件“”中,第6行,在文件“/usr/local/analyth/spark/python/lib/pyspark.zip/pyspark/sql/Streaming.py”中,第400行,在load return self._df(self._jreader.load())文件“

我想知道,因为至少有一个调整(解释器配置或直接加载)应该起作用了。

我还在控制台上尝试了spark-submit--jar/usr/local/analyse/jar/spark-streaming-kafka-0-102.11.jar,但这似乎只有在提交程序时才起作用。

因此,我还将spark-streaming-kafka-0-102.11.jar复制到/usr/local/analysis/spark/jars/中,所有其他的spark都在这里。但是在重新启动(spark和zeppelin)之后,我总是得到同样的错误。

同时,我发现我可以在webbrowser中查看spark的环境变量,并且在“classpath entries”部分中找到了spark-streaming-kafka-0-102.11.jar,其源代码为“system classpath”,也为“added By user”(似乎是Zeppelin的解释器部分中的工件)。看来我的前两次尝试应该奏效了。

共有1个答案

聂和宜
2023-03-14

第一个问题是,您已经下载了spark streaming的包,但尝试创建一个结构化的streaming对象(使用readstream())。请记住,火花流和火花结构化流是两回事,需要区别对待。

对于结构化流,您需要下载包spark-sql-kafka-0-102.11及其依赖项kafka-clients、slf4j-api、snappy-java、lz4-java和unused。您的依赖项部分应该如下所示以加载所有所需的包:

z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
z.load("/tmp/kafka-clients-2.0.0.jar")
z.load("/tmp/lz4-java-1.4.0.jar")
z.load("/tmp/snappy-java-1.1.7.1.jar")
z.load("/tmp/unused-1.0.0.jar")
z.load("/tmp/slf4j-api-1.7.16.jar")
 类似资料:
  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,

  • 我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢

  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach