当前位置: 首页 > 知识库问答 >
问题:

Spark create或replace temp view不多次更新现有表

谷梁襦宗
2023-03-14

第1步
我需要比较两个csv文件重新获得,而一个是静态的(DB. csv),另一个是从web下载的Downloaded.csv(这是动态的,可能会更新记录)

步骤2
比较两个csv的差异后,将被写入mongoDB

第三步
现在下载。csv文件需要替换DB。csv,则步骤1中的相同逻辑将继续。

示例说明

第一步

DB.csv   [temp table `db` ]

sno APPLE   BANANA
1   13  11
2   2   22
3   2   22

Downloaded.csv [temp table `downloaded` ]
sno APPLE   BANANA
1   n   11
2   2   22
3   2   22

第二步

Difference dataset
sno APPLE   BANANA
1     n       11

第三步

DB.csv [temp table `db` - updated ]
sno APPLE   BANANA
 1  n   11
 2  2   22
 3  2   22

重复步骤1

DB.csv [temp table `db` - updated ]
sno APPLE   BANANA
 1  n   11
 2  2   22
 3  2   22

Downloaded.csv [temp table `downloaded` - new downloaded record ]
sno APPLE   BANANA
1   n   11
2   2   n
3   2   22

重复步骤2

Difference dataset
sno APPLE   BANANA
2     2       n  

重复步骤3

DB.csv [temp table `db` ]
sno APPLE   BANANA
 1  n   11
 2  2   n
 3  2   22

这是我的逻辑

 Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
     Dataset<Row> db =spark.read().option("header","true").csv("/home/exa4/Desktop/db.csv");
     downloaded.createOrReplaceTempView("downloaded");
     db.createOrReplaceTempView("db");

     Dataset<Row> diffs= spark.sql("select * from downloaded EXCEPT select * from db");

    //write updates to collection
    MongoSpark.save(diffs.write().option("collection", "UpdatedRecords").mode("overwrite"));

    //replacing old DB with new dataset downloaded 
    downloaded.createOrReplaceTempView("db");

     ////For every 10 seconds I may intenstionaly change the downloaded.csv for testing , as it is dynamic dataset 
     while(true){
         long start = System.currentTimeMillis();
            Thread.sleep(10000);

             //this will be newly downloaded file from net 
             Dataset<Row> downloaded =spark.read().option("header","true").csv("/home/exa4/Desktop/downloaded.csv");
             downloaded.createOrReplaceTempView("downloaded");

            //now comparing downloaded with previously updated dataset 
            Dataset<Row> diffs_= spark.sql("select * from downloaded EXCEPT select * from db");
            diffs_.show();
             ////HERE I AM GETTING NULL RECORDS 

            downloaded.createOrReplaceTempView("db");

     }

共有1个答案

黄景胜
2023-03-14
spark.catalog.refreshTable(s"$dbName.$destinationTableName")

用dbname和表名替换

 类似资料:
  • 将excel的大数据导入到MySQL数据库需要很长时间,那么如何提高性能呢? Excel数据喜欢以下内容: 学生表 床单课程 MySQL的表喜欢如下: 学生桌 资源表 图像的浅灰色区域可能会得到改善,但我不知道如何对其进行优化。使用 和字段确定行数据是否唯一<代码>参考id和表中的字段确定行数据是否唯一。

  • 问题内容: 我一直在使用K8S ConfigMap和Secret管理我们的属性。我的设计非常简单,可以将属性文件保存在git repo中,并使用诸如Thoughtworks GO之类的构建服务器将它们自动配置为ConfigMaps或Secrets(在选择条件下)到我的k8s集群中。 当前,我发现必须总是删除现有的ConfigMap和Secret并创建一个新的要进行更新的效率不是很高: 有没有一种简

  • 问题内容: 也许我弄错了,但是我虽然JPA能够更新现有表(更改了添加列的模型),但在我的情况下却无法正常工作。 我可以在日志中看到eclipselink尝试创建它,但是由于它已经存在而失败。它不会尝试更新以添加该列,而是继续进行。 这是带有更改的表(添加了在线列) 此后,继续进行以下操作。 我是在做错什么还是错误? 问题答案: 从EclipseLink 2.4开始,您可以在持久性单元的规范中使用它

  • 我有一个标签,这是使用了多次在我的网页。它工作得很好,直到我尝试按Ajax更新标签。结果:只有第一个标签得到更新。这是一个已知的问题吗?我不确定,因为我不能打开JIRA页面:https://issues.apache.org/JIRA/browse/wicket(一直在跳)。我使用的是wicket 7.3.0版本 =>只更新第一个标签。我有一个解决办法,通过添加4个不同的标签实例与相同的文本内容。

  • 我第一次尝试React钩子,一切似乎都很好,直到我意识到当我获取数据并更新两个不同的状态变量(数据和加载标志)时,我的组件(数据表)会呈现两次,尽管对状态更新器的两次调用都发生在同一个函数中。下面是我的api函数,它将两个变量都返回到我的组件中。 在普通的类组件中,您只需调用一次来更新状态,状态可能是一个复杂的对象,但“挂钩方式”似乎是将状态拆分为更小的单元,其副作用似乎是在单独更新时多次重新呈现

  • 问题内容: 如果我需要通过一个“动作”来更新或插入到多个表中,请调用一个保存信息的示例,其中有多个包含“信息”的表。 出于参数考虑,可以说我们有下表: 姓名地址汽车工作 每次调用保存信息时,都会将其中的每个表插入其中。 哪个更好: 获取必须写入名称表的数据。调用InsertOnSubmit并调用SubmitChanges 获取必须写入地址表的数据。调用InsertOnSubmit并调用Submit