我正在使用微批处理(readstream
)从Kafka stream读取消息,处理消息并通过writestream
将结果写入另一个Kafka主题。该作业(流式查询)被设计为“永远”运行,处理大小为10秒(处理时间)的微批。已设置checkpointdirectory
选项,因为Spark需要检查点。
通过在两个流查询各自的writeStream调用中设置CheckPointLocation
选项,可以实现两个流查询的独立性。不应在sparksession中集中设置检查点位置。
那样,它们就可以独立运行,互不干扰。
我在生产中遇到检查点问题,当 spark 无法从_spark_metadata文件夹中找到文件时 已经提出了一个问题,但目前还没有解决方案。 在检查点文件夹中,我看到批次29尚未提交,所以我可以从检查点的、和/或中删除一些内容,以防止火花因缺少文件而失败?
我定义了一个结构… 有时我给它分配一个空会话(因为不可能为零) 然后我想检查一下,如果它是空的: 显然这是行不通的。我怎么写?
URI URI=URI.CREATE(url1); String requestBody=“{\”Objects\“:[471642]}”; HttpRequest request=RequestFactory.BuildPostRequest(新建GenericUrl(uri), HttpResponse response=request.execute(); '''
我有这个数据类型,只是对相关数据进行分组。它应该是一个类似结构的东西,所以我选择了一个 另一方面, 没有默认值,因此我驻留在另一个答案中提出的黑客攻击中。 显然,这使得类型检查失败:< code >错误:“Callable[[Type[NT],Any,Any,Any,Any,Any],NT]”没有属性“_ _ defaults _ _” 因为我很清楚这是一种黑客行为,所以我使用内联注释< code
我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之