当前位置: 首页 > 知识库问答 >
问题:

如何使用spark-scala连接到mysql并在dataframe中获取数据

南门祯
2023-03-14

我是Scala新手;下面的代码,没有打印来自df的值,而且spark也没有停止,即使在运行这段代码1/2小时后,它仍然继续。

    import java.sql.DriverManager
    import java.sql.Connection
    import org.apache.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql.SQLContext._
    import org.apache.spark.sql._
    import java.util.concurrent.TimeUnit

    object MysqlTest {

     def main(args: Array[String]) {
       val prop = new java.util.Properties()
       val conf = new SparkConf().setAppName("MysqlDataLoad").setMaster("local")
       val sc = new SparkContext(conf)
       val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

       prop.put("user", "***")

       prop.put("password", "*****")

       val url = "jdbc:mysql://acb-cluster.cluster-cfdz.us-wt-2.rds.amazonaws.com:3306/gsl"

       val df: DataFrame = sqlcontext.read.jdbc(url, "test_20160930_result_prop_alpha", prop)

       df.createOrReplaceTempView("gsl")

// Create dataframe of required columns from GSL table

       println("********* Data For GSL **********")

       val dataFrame2 = sqlcontext.sql("select * from gsl limit 10")

       dataFrame2.show()

       sc.stop()
      }

    }
7/05/31 12:30:51 INFO Executor: Starting executor ID driver on host localhost
17/05/31 12:30:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41593.
17/05/31 12:30:51 INFO NettyBlockTransferService: Server created on 192.168.0.132:41593
17/05/31 12:30:51 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/05/31 12:30:51 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.132:41593 with 1407.3 MB RAM, BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:52 INFO SharedState: Warehouse path is 'file:/home/vna/spark_workspace/sz-dw-etl/spark-warehouse/'.
17/05/31 12:30:57 INFO SparkSqlParser: Parsing command: gsl
********* Data For GSL **********17/05/31 12:30:57 INFO SparkSqlParser: Parsing command: select * from gsl limit 10

17/05/31 12:30:57 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
17/05/31 12:30:58 INFO CodeGenerator: Code generated in 320.985934 ms
17/05/31 12:30:58 INFO SparkContext: Starting job: collect at MysqlTest.scala:34
17/05/31 12:30:58 INFO DAGScheduler: Got job 0 (collect at MysqlTest.scala:34) with 1 output partitions
17/05/31 12:30:58 INFO DAGScheduler: Final stage: ResultStage 0 (collect at MysqlTest.scala:34)
17/05/31 12:30:58 INFO DAGScheduler: Parents of final stage: List()
17/05/31 12:30:58 INFO DAGScheduler: Missing parents: List()
17/05/31 12:30:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at collect at MysqlTest.scala:34), which has no missing parents
17/05/31 12:30:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 14.8 KB, free 1407.3 MB)
17/05/31 12:30:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 6.2 KB, free 1407.3 MB)
17/05/31 12:30:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.132:41593 (size: 6.2 KB, free: 1407.3 MB)
17/05/31 12:30:58 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/05/31 12:30:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at collect at MysqlTest.scala:34)
17/05/31 12:30:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/05/31 12:30:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5723 bytes)
17/05/31 12:30:58 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

我试图获取的表的更多细节是:它的44 gb,有100000000条记录。但我的查询明确限制它只能获取10条记录而不进行任何排序。

共有1个答案

鲜于河
2023-03-14

尝试这样做:

  val properties = new Properties()
  properties.put("user", "root")
  properties.put("password", "123456")
  
  val url = "jdbc:mysql://localhost:3306/sakila"

  val df = spark.read.jdbc(url,"actor",properties = properties)

确保连接器已加载。

libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.49"
 类似资料:
  • 我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。

  • 我是新来的Play框架。我试图配置MySQL数据库作为一个数据源,用于播放。 我已经做了以下设置来连接mysql数据库进行播放。。 db。违约driver=com。mysql。jdbc。驾驶员 db.default.url="jdbc:mysql://localhost/phpmyadmin/index.php?db=formdemo 但我得到以下配置错误。 play.api.配置$$anon1美

  • 如何从df_raw中提取数据(标签)这是Mapstruct? 我正在使用Spark 1.6。我在Spark中通过hivesql从Hive获取数据,然后我得到了一个数据框,但数据框中的一列是Mapstruct,我试图从中提取数据,但失败了,非常希望stackoverflow能给我一些帮助,3Q。 从Hive获取数据后,我得到了一个名为df\u raw的数据帧,模式为: 和df\U raw。显示(3)

  • 我正在使用Datastax spark-Cassandra-connector访问cassandra中的一些数据。我的要求是将RDD与Cassandra表连接起来,获取结果并将其存储在配置单元表中。 我正在犯错误 你能帮我把转换成数据文件吗?

  • 问题内容: 如何使用java连接到mysql数据库? 问题答案: 逐步说明如何安装MySQL和JDBC以及如何使用它: 1.下载并安装MySQL服务器。只需按照通常的方式进行即可。每次更改时都请记住端口号。默认情况下3306。 2.下载 JDBC驱动程序并放入classpath,解压缩ZIP文件并将包含的JAR文件放入classpath。特定于供应商的JDBC驱动程序是 JDBC API的具体实现

  • 当我像这样使用DataFrame groupby时: 我只会得到一个列为“age”和“count(id)”的DataFrame,但在df中,还有许多其他列,如“name”。 总之,我想得到MySQL中的结果, “按年龄从df组中选择姓名、年龄、计数(id)” 在Spark中使用groupby时该怎么办?