当前位置: 首页 > 知识库问答 >
问题:

利用python和Kafka给出错误的Spark结构流

孟雪风
2023-03-14

当我尝试为kafka启动一个readStream时,我得到了以下错误,我的kafka已经启动并运行,我测试了它多次以确保它正在处理。Kafka主题也被创建。

kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "mytopic") \
        .option("startingOffsets", "earliest") \
        .load()

共有1个答案

柴兴贤
2023-03-14

您需要导入kafka依赖项来运行这个!对于pyspark,您可以下载jar并将其放到Spark/jars目录中,或者在sparkSession inital配置中导入依赖项。请跟随这个Kafka结构的流媒体文档

我希望我能帮上忙,你能问我什么,谢谢!

 类似资料:
  • 我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p

  • 问题内容: 根据Wolfram Mathematica: cos(50) = 0.6427876096865394 ; 但是这段Java代码: 给出 0.9649660284921133 。 有什么问题吗? 问题答案: 期望参数以弧度为单位。这将返回您需要的结果:

  • 我有一个storm拓扑,如下所示: 螺栓ta->螺栓b->螺栓tc->螺栓d 只是对请求进行一些格式化,并发出另一个元组。执行一些处理,并为接受的每个元组发出大约100个元组。和处理这些元组。所有的bolts都实现了。 MessageTimeoutInsecons:300

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach

  • 我试图使用Spark,更具体地说是PySpark和结构化流来消费Kafka。 PY4JJavaError:调用O70时出错。AwaitTermination

  • 当我使用put操作将数据对象插入aws firhose流时,它工作正常。由于在我的fire hose流上启用了lambda函数。因此调用了lambda函数,但给了我一个输出结构响应错误: 所以现在我已经创建了我的lambda函数,这样可以做出正确的输出结构: 现在,我得到了以下关于将“数据”字段编码为 如果我将“hello”更改为b“hello”之类的字节,则会出现以下错误: