当前位置: 首页 > 知识库问答 >
问题:

如何从工作人员访问SparkSession?

廖弘伟
2023-03-14

我在databricks中使用spark结构化流媒体。在这里,我使用foreach操作对每个数据记录执行一些操作。但是我传递给foreach的函数使用了SparkSession,但它抛出了一个错误:_pickle。PicklingError:无法序列化对象:异常:似乎您正试图从广播变量、操作或转换引用SparkContext。SparkContext只能在驱动程序上使用,不能在工作程序上运行的代码中使用。

那么,有没有办法在foreach中使用SparkSession?

编辑#1:foreach中传递的函数示例如下:

def process_data(row):
  df = spark.createDataFrame([row])    
  df.write.mode("overwrite").saveAsTable("T2")
  spark.sql("""
    MERGE INTO T1
    USING T2 ON T2.type="OrderReplace" AND T1.ReferenceNumber=T2.originalReferenceNumber
    WHEN MATCHED THEN
    UPDATE SET
      shares = T2.shares,
      price = T2.price,
      ReferenceNumber = T2.newReferenceNumber
  """)

所以,我需要Spark会话在这里,这是不可用的Foreach。

共有1个答案

利永年
2023-03-14

通过对要在流数据集上逐行执行的任务的描述,可以尝试使用foreachBatch。

流媒体数据集可能有数千到数百万条记录,如果您在每条记录上访问外部系统,这将是一个巨大的开销。从您的示例中,您似乎正在根据T2中的事件更新基本数据集T1

通过上面的链接,您可以执行以下操作:,

def process_data(df, epoch_id):    
  df.write.mode("overwrite").saveAsTable("T2")
  spark.sql("""
    MERGE INTO T1
    USING T2 ON T2.type="OrderReplace" AND T1.ReferenceNumber=T2.originalReferenceNumber
    WHEN MATCHED THEN
    UPDATE SET
      shares = T2.shares,
      price = T2.price,
      ReferenceNumber = T2.newReferenceNumber
  """)

streamingDF.writeStream.foreachBatch(process_data).start()   

关于你正在做的事情,还有一些额外的要点,

  1. 您正在尝试对流数据集执行foreach并覆盖T2hive表。由于记录是在集群管理器(如warn)中并行处理的,因此可能存在损坏的T2,因为多个任务可能会更新它
 类似资料:
  • 我试图理解火花是如何在引擎盖下洗牌依赖的。因此,我有两个问题: > 在Spark中,执行器如何知道从其他执行器提取数据? 每个执行器在完成其映射端任务后,是否将其状态和位置更新到某个中心实体(可能是驱动程序),并且还原端执行器首先联系驱动程序以获取要从中拉出的每个执行器的位置,然后直接从这些执行器中拉出? null

  • 我希望通过Workday API检索Workday worker(又名employee)的web配置文件URL。使用案例是,我正在构建一个聊天机器人来检索用户信息,我希望能够深入链接到工人(员工)的web配置文件。 问题是我不能做以下任何一项: 从API获取web配置文件URL 从API中的数据创建web配置文件URL web配置文件URL如下所示。用户ID看起来像就在扩展名,因为这是员工档案之间

  • 问题内容: 我在rails-3项目上使用Resque来处理计划每5分钟运行一次的作业。我最近做了一些工作,使这些工作的创建雪上加霜,而堆栈已经击中了1000多个工作。我修复了导致许多作业排队的问题,现在我遇到的问题是由错误创建的作业仍然存在,因此由于将作业添加到具有1000多个作业的队列中,因此很难进行测试。我似乎无法停止这些工作。我尝试使用flushall命令从redis- cli中删除队列,但

  • 我正在玩Kafka-Connect。我让 在独立模式和分布式模式下工作。 他们宣传工人(负责运行连接器)可以通过 进行管理 但是,我还没有看到任何描述如何实现这一目标的文档。 我如何着手让< code>YARN执行工人?如果没有具体的方法,是否有通用的方法来让应用程序在< code>YARN中运行? 我已经使用< code>spark-submit将< code>YARN与SPARK一起使用,但是

  • 所以我的问题很简单。当类成员是而不是时,我可以访问单独类中的类成员,但当它们是时,我似乎不能。使用JDK7U6(W/JavaFX2.2)。 简单的例子。第一个管用,第二个不行。可以在第一个示例中赋值,但是在第二个示例中得到了一个NullPointer。有人能解释为什么在下面的第二个示例中是null吗? 更新:我从最初的问题中删除了@fxml注释,因为我认为它们对我遇到的问题没有必要。另外,请参阅@

  • 问题内容: 我有一些Resque工作者使用Rails应用程序。看来我最多只能同时运行2个工人(应用程序在生产Apache的EC2上运行)。有什么办法可以提高这个限制? 编辑: 我在redis.conf中有maxclients 0 我可以通过rake来启动10个工作程序,但是当他们实际上正在排队时,我在浏览器中得到“ ERR最大到达客户端数”。 编辑:更新的错误(在原始文件中是正确的) 编辑:实际上