当前位置: 首页 > 知识库问答 >
问题:

火花读取来自 SAS 国际移民组织的 JDBC

弓玉书
2023-03-14

我正在尝试使用火花 JDBC 从 SAS IOM 读取数据。问题是SAS JDBC驱动程序有点奇怪,所以我需要创建自己的方言:

object SasDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")
  override def quoteIdentifier(colName: String): String = "\"" + colName + "\"n"
}

然而,这还不够。SAS区分了列标签(=人类可读的名称)和列名称(=您在SQL查询中使用的名称),但似乎Spark在模式发现中使用列标签而不是名称,请参阅下面的JdbcUtils摘录:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L293

while (i < ncols) {
  val columnName = rsmd.getColumnLabel(i + 1)

这会导致SQL错误,因为它试图在生成的SQL代码中使用人类可读的列名。

为了使SAS IOM JDBC工作,这需要是getColumnName而不是getColumn标签。有没有办法用方言来说明这一点?除了包装整个com.sas.rio之外,我真的找不到一个方法来实现这一点。MVADriver和resultsetmeta

直率的

共有1个答案

薛利
2023-03-14

与此同时,我发现了如何做,所以只是发布以供参考。诀窍是注册你自己的方言,如下所示。

另外,SAS用空格填充所有的varchar列,所以我修剪所有的string列。

  def getSasTable(sparkSession: org.apache.spark.sql.SparkSession, tablename: String): org.apache.spark.sql.DataFrame = {                                       
    val host : String = "dwhid94.msnet.railb.be";                                                                                                               
    val port : String = "48593";                                                                                                                                
    val props = new java.util.Properties();                                                                                                                     
    props.put("user", CredentialsStore.getUsername("sas"))                                                                                                      
    props.put("password", CredentialsStore.getPassword("sas"))                                                                                                  
    props.setProperty("driver", "com.sas.rio.MVADriver")                                                                                                        
    val sasconurl : String =  String.format("jdbc:sasiom://%s:%s", host, port);                                                                                 
                                                                                                                                                                
    object SasDialect extends JdbcDialect {                                                                                                                     
      override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")                                                                              
      override def quoteIdentifier(colName: String): String = "\"" + colName + "\"n"                                                                            
    }                                                                                                                                                           
    JdbcDialects.registerDialect(SasDialect)                                                                                                                    
    val df = sparkSession.read                                                                                                                                  
      .option("url", sasconurl)                                                                                                                                 
      .option("driver", "com.sas.rio.MVADriver")                                                                                                                
      .option("dbtable", tablename)                                                                                                                             
      .option("user",CredentialsStore.getUsername("sas"))                                                                                                       
      .option("password",CredentialsStore.getPassword("sas"))                                                                                                   
      .option("fetchsize",100)                                                                                                                                  
      .format("jdbc")                                                                                                                                           
      .load()                                                                                                                                                   
                                                                                                                                                                
    val strippedDf = sparkSession.createDataFrame(df.rdd.map(r => Row(r.toSeq.map(x => x match {case s: String => s.trim; case _ => x}): _*)), df.schema);      
    return strippedDf;                                                                                                                                          
  }                                                                                                                                                             
                                                                                                                                                                
 类似资料:
  • 试图读取一个空的镶木地板目录,得到这个错误 无法指定拼花地板的架构。必须手动指定 我的代码 尝试处理scala尝试和定期检查空目录 任何想法

  • 我想用Apache Spark读入具有以下结构的文件。 csv太大了,不能使用熊猫,因为读取这个文件需要很长时间。有什么方法类似于 多谢!

  • 将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Sp

  • 我有一些Spark经验,但刚开始使用Cassandra。我正在尝试进行非常简单的阅读,但性能非常差——不知道为什么。这是我正在使用的代码: 所有3个参数都是表上键的一部分: 主键(group\u id,epoch,group\u name,auto\u generated\u uuid\u field),聚类顺序为(epoch ASC,group\u name ASC,auto\u generat

  • 正在尝试读取avro文件。 无法将运行到Avro架构的数据转换为Spark SQL StructType:[“null”,“string”] 尝试手动创建架构,但现在遇到以下情况: 通用域名格式。databricks。火花阿夫罗。SchemaConverters$CompatibleSchemaException:无法将Avro架构转换为catalyst类型,因为路径处的架构不兼容(avroTyp

  • 我在一个Spark项目上工作,这里我有一个文件是在parquet格式,当我试图用java加载这个文件时,它给了我下面的错误。但是,当我用相同的路径在hive中加载相同的文件并编写查询select*from table_name时,它工作得很好,数据也很正常。关于这个问题,请帮助我。 java.io.ioException:无法读取页脚:java.lang.runtimeException:损坏的文