当前位置: 首页 > 工具软件 > StreamingPro > 使用案例 >

streamingpro mysql_使用StreamingPro 快速构建Spark SQL on CarbonData

羊浩广
2023-12-01

前言

CarbonData已经发布了1.0版本,变更还是很快的,这个版本已经移除了kettle了,使得部署和使用 变得很简单,而且支持1.6+ ,2.0+等多个Spark版本。

StreamingPro可以使得你很简单通过一个命令就能体验Carbondata,并且支持Http/JDBC的访问形态。

下载Spark发行版

比如我下载后的版本是这个: spark-1.6.3-bin-hadoop2.6。

下载StreamingPro

你需要一个数据库

因为我们用到了Hive 的mysql,所以你需要准备一个可以连接的数据库。只要能连接就行。如果没有,比如你是mac的话,用

brew install mysql

即可。然后brew services start mysql

创建一个数据库:

create database hive CHARACTER SET latin1

//如果数据库包字符异常啥的,启动完streamingpro后到数据库做如下更改:

alter table PARTITIONS convert to character set latin1;

alter table PARTITION_KEYS convert to character set latin1;

写一个hive-site.xml文件

javax.jdo.option.ConnectionURL

jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

javax.jdo.option.ConnectionUserName

你的mysql账号

javax.jdo.option.ConnectionPassword

你的mysql密码

hive.metastore.warehouse.dir

file:///tmp/user/hive/warehouse

hive.exec.scratchdir

file:///tmp/hive/scratchdir

hive.metastore.uris

datanucleus.autoCreateSchema

true

可以启动了

//streamingpro jar包所处的目录,

//里面新建一个query.json文件,里面放一个大括号就行

SHome=/Users/allwefantasy/streamingpro

./bin/spark-submit --class streaming.core.StreamingApp \

--master local[2] \

--name sql-interactive \

--jars /Users/allwefantasy/.m2/repository/org/apache/carbondata/carbondata-spark/1.0.0-incubating/carbondata-spark-1.0.0-incubating.jar \

--files $SHome/hive-site.xml \

--conf "spark.sql.hive.thriftServer.singleSession=true" \

$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar \

-streaming.name sql-interactive \

-streaming.job.file.path file://$SHome/query.json \

-streaming.platform spark \

-streaming.rest true \

-streaming.driver.port 9004 \

-streaming.spark.service true \

-streaming.thrift true \

-streaming.enableCarbonDataSupport true \

-streaming.enableHiveSupport true \

-streaming.carbondata.store /tmp/carbondata/store \

-streaming.carbondata.meta /tmp/carbondata/meta

参数比较多。大家不用管他。 这样http端口是9004, jdbc端口是 10000。

我们可以通过http创建一张表

//这里的sql是: CREATE TABLE IF NOT EXISTS test_table4(id string, name string, city string, age Int) STORED BY 'carbondata'

curl --request POST \

--url http://127.0.0.1:9004/run/sql \

--header 'cache-control: no-cache' \

--header 'content-type: application/x-www-form-urlencoded' \

--header 'postman-token: 731441ac-c398-9a1b-2f06-8725ddbe84cd' \

--data 'sql=CREATE%20TABLE%20IF%20NOT%20EXISTS%20test_table4(id%20string%2C%20name%20string%2C%20city%20string%2C%20age%20Int)%20STORED%20BY%20'\''carbondata'\'''

写入数据前,我们建立一个sample.csv的文件,

id,name,city,age

1,david,shenzhen,31

2,eason,shenzhen,27

3,jarry,wuhan,35

然后将这个文件导入:

//实际SQL:LOAD DATA LOCAL INPATH '/Users/allwefantasy/streamingpro/sample.csv' INTO TABLE test_table4

curl --request POST \

--url http://127.0.0.1:9004/run/sql \

--header 'cache-control: no-cache' \

--header 'content-type: application/x-www-form-urlencoded' \

--header 'postman-token: 5eb19ab4-653c-d05f-29ab-6003d7e83755' \

--data 'sql=LOAD%20DATA%20LOCAL%20INPATH%20%20'\''%2FUsers%2Fallwefantasy%2Fstreamingpro%2Fsample.csv'\''%20%20INTO%20TABLE%20test_table4'

这个使用我们可以用http查询:

//sql: SELECT * FROM test_table4

curl --request POST \

--url http://127.0.0.1:9004/run/sql \

--header 'cache-control: no-cache' \

--header 'content-type: application/x-www-form-urlencoded' \

--header 'postman-token: d99349ae-b226-8a4e-4d65-d92b1771c111' \

--data 'sql=SELECT%20*%20FROM%20test_table4'

你也可以写一个jdbc程序:

object ScalaJdbcConnectSelect {

def main(args: Array[String]) {

// connect to the database named "mysql" on the localhost

val driver = "com.mysql.jdbc.Driver"

val url = "jdbc:hive2://localhost:10000/default"

// there's probably a better way to do this

var connection:Connection = null

try {

// make the connection

Class.forName(driver)

connection = DriverManager.getConnection(url)

// create the statement, and run the select query

val statement = connection.createStatement()

val resultSet = statement.executeQuery("SELECT * FROM test_table4 ")

while ( resultSet.next() ) {

println(" city = "+ resultSet.getString("city") )

}

} catch {

case e => e.printStackTrace

}

connection.close()

}

}

完成。

 类似资料: