当前位置: 首页 > 知识库问答 >
问题:

火花流式阅读不是从Kafka的主题

童宏富
2023-03-14

我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。

    null

Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic

Kafka使用者:bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic new_topic--from-begint

Pyspark代码(Jupyter笔记本):

#!/usr/bin/env python
# coding: utf-8
import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages 
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 pyspark-shell'

import findspark
findspark.init('/home/shekhar/spark-2.3.2-bin-hadoop2.7')
import pyspark
import sys

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from uuid import uuid1

if __name__=="__main__":
    #sconf = SparkConf().setAppName("SparkStr").setMaster("local")
    sc = SparkContext(appName="SparkStreamingReceiverKafkaWordCount")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,2)
    broker,topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc,"localhost:9092","raw-event- 
    streaming-consumer",{topic:1})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()
-------------------------------------------
Time: 2019-01-30 00:52:18
-------------------------------------------

-------------------------------------------
Time: 2019-01-30 00:52:20
-------------------------------------------

Spark-submit命令:

bin/spark-submit 
  --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 SparkKafka-Copy1.py localhost:9092 new_topic 
  --master spark://localhost:4040

spark-submit on terminal的输出如下:

Ivy Default Cache set to: /home/shekhar/.ivy2/cache
The jars for the packages stored in: /home/shekhar/.ivy2/jars
:: loading settings :: url = jar:file:/home/shekhar/spark-2.3.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df;1.0
    confs: [default]
    found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 in central
    found org.apache.kafka#kafka_2.11;0.8.2.1 in central
    found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
    found com.yammer.metrics#metrics-core;2.2.0 in central
    found org.slf4j#slf4j-api;1.7.16 in central
    found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 in central
    found com.101tec#zkclient;0.3 in central
    found log4j#log4j;1.2.17 in central
    found org.apache.kafka#kafka-clients;0.8.2.1 in central
    found net.jpountz.lz4#lz4;1.2.0 in central
    found org.xerial.snappy#snappy-java;1.1.2.6 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 617ms :: artifacts dl 19ms
    :: modules in use:
    com.101tec#zkclient;0.3 from central in [default]
    com.yammer.metrics#metrics-core;2.2.0 from central in [default]
    log4j#log4j;1.2.17 from central in [default]
    net.jpountz.lz4#lz4;1.2.0 from central in [default]
    org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
    org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
    org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 from central in [default]
    org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 from central in [default]
    org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
    org.slf4j#slf4j-api;1.7.16 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   12  |   0   |   0   |   0   ||   12  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df
    confs: [default]
    0 artifacts copied, 12 already retrieved (0kB/25ms)
2019-01-30 18:40:19 WARN  Utils:66 - Your hostname, shekhar-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2019-01-30 18:40:19 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2019-01-30 18:40:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:100)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 12 more
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-e6d0532c-3593-4c28-8bb6-6d48aedb12f3

共有1个答案

仲绍晖
2023-03-14

现在解决了。我必须设置PYTHONPATH并将其导出到.bashrc文件中的路径中。

PYTHONPATH=/usr/bin/python3
export PATH=$PATH:$PYTHONPATH/bin

在createstream下的main函数中,zookeeper端口被更改为2181,错误地给出了9092。

 类似资料:
  • 我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。 密码 bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12

  • 我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,

  • 我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre

  • 嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写

  • 我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可

  • 我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。