当前位置: 首页 > 知识库问答 >
问题:

尝试使用spark结构化流来消耗kafka流

赖杰
2023-03-14

我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,但是当我尝试使用Spark结构化流来使用相同的流时,它没有捕获并抛出错误-未能找到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;找到下面的截图

  1. 命令输出-消耗数据
  2. spark使用者的Jupyter输出-不消耗数据

我的生产者或监听器代码:

auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")
# session.set('request_token', auth.request_token)
api = tweepy.API(auth)
class KafkaPushListener(StreamListener):          
    def __init__(self):
        #localhost:9092 = Default Zookeeper Producer Host and Port Adresses
        self.client = pykafka.KafkaClient("0.0.0.0:9092")

    #Get Producer that has topic name is Twitter
        self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()

    def on_data(self, data):
        #Producer produces data for consumer
        #Data comes from Twitter
        self.producer.produce(bytes(data, "ascii"))
        return True

    def on_error(self, status):
        print(status)
        return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])

用户从Spark结构化流访问

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "twitter") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

共有1个答案

贺山
2023-03-14

当我提交spark-job时,我必须包含正确的依赖项包版本。我有spark 3.0.0,因此,我包含了-org.apache.spark:spark-sql-kafka-0-102.12:3.0.0包

 类似资料:
  • 我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach