当前位置: 首页 > 知识库问答 >
问题:

从Kafka消费,使用Kafka方法和火花流给出了不同的结果

闻昊英
2023-03-14

我正在尝试使用spark Streaming从Kafka中消耗一些数据。

我创造了2个工作岗位,

  1. 一个简单的Kafka作业,它使用:
consumeFirstStringMessageFrom(topic)
{
  "data": {
    "type": "SA_LIST",
    "login": "username@mycompany.com",
    "updateDate": "2020-09-09T14:58:39.775Z",
    "content": [
      {
        "sku": "800633955",
        "status": "ACTIVE",
        "quantity": 1
      }
    ],
    "saCode": "E40056",
    "clientId": "30179801688090",
    "$setOnInsert": {
      "__v": 0
    }
  },
  "operation": "UPDATE",
  "type": "List"
}
val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConfig.broker)
      .option("subscribe", kafkaConfig.topic)
      .option("startingOffsets", kafkaConfig.startingOffsets)
      .load()

 df.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .option("truncate", false)
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start().awaitTermination()
{
  "key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
  "value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
  "topic": "PLP_GLOBAL_QA",
  "partition": 0,
  "offset": 1826,
  "timestamp": "2020-09-10T16:09:08.606Z",
  "timestampType": 0
}

共有1个答案

越雨泽
2023-03-14

Spark流作业以序列化的形式向您显示数据,而您的Kafka使用者已经对其进行了反序列化。

根据Spark Structured Kafka integration guide,您不仅可以获得Kafka消息的关键和价值,还可以获得其他(元)信息。下面是您从Kafka获得的每条消息的模式:

Column      Type
key         binary
value       binary
topic       string
partition   int
offset      long
timestamp   timestamp
timestampType   int

如果您只想选择键和值,甚至只想选择值,您可以选择它们并将它们转换为人类可读的字符串:

[...]
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
 类似资料:
  • 我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?

  • 为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保

  • 我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.