当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka和Spark流媒体

韦正业
2023-03-14

我正在读这篇博文:

http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html

它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Streaming来阅读某个主题的消息。我想知道是否有一个Spark流媒体API,我可以用来将消息写入Kakfa主题?

我的用例非常简单。我有一组数据,可以以固定的间隔(比如每秒)从给定的源读取。我使用反应流来实现这一点。我想用Spark对这些数据进行一些分析。我想有容错能力,所以Kafka开始发挥作用。因此,我基本上会做以下几点(如果我错了,请纠正我):

  1. 使用反应式流以恒定的间隔从外部源获取数据
  2. 将结果导入Kafka主题
  3. 使用Spark Streaming,为消费者创建流上下文
  4. 对消耗的数据执行分析

不过,另一个问题是,Spark中的Streaming API是反应式流规范的实现吗?它是否具有背压处理(Spark Streaming v1.5)?

共有2个答案

唐照
2023-03-14

如果您必须将结果流写入另一个Kafka主题,比如“topic_x”,首先,您必须在您试图写入主题_x的结果流中包含名为“Key”和“Value”的列。

result_stream = result_stream.selectExpr('CAST (key AS STRING)','CAST (value AS STRING)')
kafkaOutput = result_stream \
               .writeStream \
               .format('kafka') \
               .option('kafka.bootstrap.servers','192.X.X.X:9092') \
               .option('topic','topic_x') \
               .option('checkpointLocation','./resultCheckpoint') \
               .start()

kafkaOutput.awaitTermination()

有关更多详细信息,请查看https://spark.apache.org/docs/2.4.1/structured-streaming-kafka-integration.html

柯乐童
2023-03-14

>

你可以在Spark流媒体应用程序中给Kafka写信,下面是一个例子。

(充分披露:我是一些背压工作的实施者之一)

 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在尝试从Spark官方网站运行Spark Streaming示例 这些是我在pom文件中使用的依赖项: 这是我的Java代码: 当我尝试从Eclipse运行它时,我遇到以下异常: 我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

  • 我正在测试我的火花流应用程序,我在我的代码中有多个函数:-其中一些在DStream[RDD[XXX]上运行,其中一些在RDD[XXX]上运行(在我做DStream.foreachRDD之后)。 我使用Kafka log4j appender来记录发生在我的函数中的业务案例,这些案例在DStream[RDD]上运行 但只有当来自于在RDD上运行的函数时,数据才会附加到Kafka- 有人知道这种行为的

  • 我通过spark streaming应用了一个实时异常检测系统。在每个流媒体时间间隔内,如果数据点异常,AWS SNS会发送电子邮件订阅帐户。但是AWS SNS java sdk不喜欢在spark流媒体中工作。下面是错误消息 ERROR StreamingContext:启动上下文时出错,将其标记为停止的java。伊奥。NotSerializableException:已启用数据流检查点,但数据流

  • 我是一个初学者,试图使用spark streaming获得推文,使用Scala和一些过滤器关键字。是否有可能在流媒体之后只过滤那些没有地理定位为Null的推文?我正在尝试保存ElasticSearch中的推文。所以,在将tweet地图保存到ElasticSearch之前,我可以过滤那些带有地理定位信息的地图,然后保存它们吗?我正在使用json4s.jsondsl和tweet中的字段创建JSON。这