当前位置: 首页 > 知识库问答 >
问题:

Cosmos Changefeed火花流随机停止

钮高朗
2023-03-14

我有一个 Spark 流式处理作业,它读取 Cosmos 更改源数据,如下所示,在具有 DBR 8.2 的数据砖集群中运行。

cosmos_config = {
  "spark.cosmos.accountEndpoint": cosmos_endpoint,
  "spark.cosmos.accountKey": cosmos_key,
  "spark.cosmos.database": cosmos_database,
  "spark.cosmos.container": collection,
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now",
  "spark.cosmos.changeFeed.mode" : "Incremental"
}
df_ read = (spark.readStream
                 .format("cosmos.oltp.changeFeed")
                 .options(**cosmos_config)
                 .schema(cosmos_schema)
                 .load())
                     
                     
df_write = (df_ read.withColumn("partition_date",current_date())
              .writeStream
              .partitionBy("partition_date")
              .format('delta')
              .option("path", master_path)
              .option("checkpointLocation", f"{master_path}_checkpointLocation")
              .queryName("cosmosStream")
              .trigger(processingTime='10 seconds')
              .start()
            )       

虽然作业正常工作,但偶尔,流会突然停止,并且在log4j输出中出现以下循环。重新启动作业将处理“待办事项”中的所有数据。以前有人经历过这样的事情吗?我不确定是什么原因造成的。有什么想法吗?

22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: default   
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443

共有1个答案

秦俊
2023-03-14

您使用的是哪个版本的宇宙火花连接器?在 4.3.0 和 4.6.2 之间,在批量引入代码路径中修复了几个 bug。

看见https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md更多细节。

 类似资料:
  • 我正在尝试优化我的火花应用工作。 我试图理解这个问题的要点:如何在唯一键上连接数据帧时避免混乱? > 我已经确保必须发生加入操作的键分布在同一分区中(使用我的自定义分区程序)。 我也不能做广播加入,因为我的数据可能会根据情况变大。 在上面提到的问题的答案中,重新分区只优化了连接,但我需要的是无需切换即可连接。在分区内的键的帮助下,我对连接操作很满意。 有可能吗?如果不存在类似的功能,我想实现像jo

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 我有一个spark流媒体作业,它每5秒钟读取一次Kafka,对传入的数据进行一些转换,然后写入文件系统。 这实际上不需要是一个流式作业,实际上,我只想每天运行一次,将消息排入文件系统。但我不知道如何停止这项工作。 如果我向streamingContext传递超时。等待终止,它不会停止进程,它所做的只是导致进程在流上迭代时产生错误(请参见下面的错误) 实现我所要做的事情的最佳方式是什么 这是Pyth

  • 如果spark streaming在10秒的批处理间隔中获得50行消息,并且在40.5行消息之后,这10秒就结束了,剩下的时间落入另一个10秒的间隔中,前40.5行的文本是一个RDD被首先处理,在我的用例中,前40行是有意义的,但是下一个。5行没有意义,第二个RDD首先也是这样。5行,我的问题是否有效?。请提供建议如何处理这个问题?。 谢谢比尔。

  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一