我正在使用Spark结构化流媒体阅读Kafka主题。
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
我错过什么了吗?
通过将kafka-clients-0.10.1.1.jar更改为kafka-clients-0.10.0.1.jar可以解决问题。
在这里找到了引用Spark Structured Stream只从Kafka的一个分区获取消息
我使用spark 2.2.1,kafka_2.12-1.0.0和scala从kafka获取一些json数据,但是,我只连接了kafka,没有数据输出。 这里是我的scala代码: 这是我的绒球.xml 我运行这段代码,控制台没有显示任何来自kafka的数据。 这里是控制台输出: 输出只是说我的消费者群体已经死亡。我的kafka运行良好,我可以使用控制台命令从“行为”主题中获取数据。总之,Kafka
我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我
我开发了一个Python Kafka生成器,它将多个json记录作为nd-json二进制字符串发送到一个Kafka主题。然后,我尝试用PySpark在Spark结构化流媒体中读取这些消息,如下所示:
我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:
我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?