我正在尝试使用spark Streaming从Kafka中消耗一些数据。
我创造了2个工作岗位,
consumeFirstStringMessageFrom(topic)
{
"data": {
"type": "SA_LIST",
"login": "username@mycompany.com",
"updateDate": "2020-09-09T14:58:39.775Z",
"content": [
{
"sku": "800633955",
"status": "ACTIVE",
"quantity": 1
}
],
"saCode": "E40056",
"clientId": "30179801688090",
"$setOnInsert": {
"__v": 0
}
},
"operation": "UPDATE",
"type": "List"
}
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig.broker)
.option("subscribe", kafkaConfig.topic)
.option("startingOffsets", kafkaConfig.startingOffsets)
.load()
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("2 seconds"))
.start().awaitTermination()
{
"key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
"value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
"topic": "PLP_GLOBAL_QA",
"partition": 0,
"offset": 1826,
"timestamp": "2020-09-10T16:09:08.606Z",
"timestampType": 0
}
Spark流作业以序列化的形式向您显示数据,而您的Kafka使用者已经对其进行了反序列化。
根据Spark Structured Kafka integration guide,您不仅可以获得Kafka消息的关键和价值,还可以获得其他(元)信息。下面是您从Kafka获得的每条消息的模式:
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
如果您只想选择键和值,甚至只想选择值,您可以选择它们并将它们转换为人类可读的字符串:
[...]
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?
为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保
我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法
我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-
我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p
我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.