Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。
Spark Streaming 是 Spark API 核心的扩展,它支持来自各种来源的流处理。
StreamingPro 是一个可扩展、可编程的 Spark Streaming 框架(也包括 Spark,Storm),可以轻松地用于构建流式应用。
StreamingPro 支持以 Spark、Flink 等作为底层分布式计算引擎,通过一套统一的配置文件完成批处理、流式计算与Rest 服务的开发。 特点有:
使用Json描述文件完成流式,批处理的开发,不用写代码。
支持SQL Server,支持XSQL/MLSQL(重点),完成批处理,机器学习,即席查询等功能。
标准化输入输出,支持UDF函数注册,支持自定义模块开发
支持Web化管理Spark应用的启动,监控
如果更细节好处有:
跨版本:StreamingPro可以让你不用任何变更就可以轻易的运行在spark 1.6/2.1/2.2上。
新语法:提供了新的DSl查询语法/Json配置语法
程序的管理工具:提供web界面启动/监控 Spark 程序
功能增强:2.1之后Structured Streaming 不支持kafka 0.8/0.9 ,Structured,此外还有比如spark streaming 支持offset 保存等
简化Spark SQL Server搭建成本:提供rest接口/thrift 接口,支持spark sql server 的负载均衡,自动将driver 注册到zookeeper上
探索更多的吧
模块名 | 描述 | 备注 |
---|---|---|
streamingpro-commons | 一些基础工具类 | |
streamingpro-spark-common | Spark有多个版本,所以可以共享一些基础的东西 | |
streamingpro-flink | streamingpro对flink的支持 | |
streamingpro-spark | streamingpro对spark 1.6.x的支持 | |
streamingpro-spark-2.0 | streamingpro对spark 2.x的支持 | |
streamingpro-api | streamingpro把底层的spark API暴露出来,方便用户灵活处理问题 | |
streamingpro-manager | 通过该模块,可以很方便的通过web界面启动,管理,监控 spark相关的应用 | |
streamingpro-dls | 自定义connect,load,select,save,train,register等语法,便于用类似sql的方式做批处理任务,机器学习等 |
如果你使用StreamingPro,那么所有的工作都是在编辑一个Json配置文件。通常一个处理流程,会包含三个概念:
多个输入
多个连续/并行的数据处理
多个输出
StreamingPro会通过'compositor'的概念来描述他们,你可以理解为一个处理单元。一个典型的输入compositor如下:
{ "name": "batch.sources", "params": [ { "path": "file:///tmp/hdfsfile/abc.txt", "format": "json", "outputTable": "test" }, { "path": "file:///tmp/parquet/", "format": "parquet", "outputTable": "test2" } ] }
batch.sources
就是一个compositor的名字。 这个compositor 把一个本地磁盘的文件映射成了一张表,并且告知系统,abc.txt里的内容 是json格式的。这样,我们在后续的compositor模块就可以使用这个test
表名了。通常,StreamingPro希望整个处理流程, 也就是不同的compositor都采用表来进行衔接。
StreamingPro不仅仅能做批处理,还能做流式,流式支持Spark Streaming,Structured Streaming。依然以输入compositor为例,假设 我们使用的是Structured Streaming,则可以如下配置。
{ "name": "ss.sources", "params": [ { "format": "kafka9", "outputTable": "test", "kafka.bootstrap.servers": "127.0.0.1:9092", "topics": "test", "path": "-" }, { "format": "com.databricks.spark.csv", "outputTable": "sample", "header": "true", "path": "/Users/allwefantasy/streamingpro/sample.csv" } ] }
第一个表示我们对接的数据源是kafka 0.9,我们把Kafka的数据映射成表test。 因为我们可能还需要一些元数据,比如ip和城市的映射关系, 所以我们还可以配置一些其他的非流式的数据源,我们这里配置了一个smaple.csv文件,并且命名为表sample。
如果你使用的是kafka >= 1.0,则 topics 参数需要换成'subscribe',并且使用时可能需要对内容做下转换,类似:
select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test
启动时,你需要把-streaming.platform 设置为 ss
。
如果我们的输入输出都是Hive的话,可能就不需要batch.sources/batch.outputs 等组件了,通常一个batch.sql就够了。比如:
"without-sources-job": { "desc": "-", "strategy": "spark", "algorithm": [], "ref": [], "compositor": [ { "name": "batch.sql", "params": [ { "sql": "select * from hiveTable", "outputTableName": "puarquetTable" } ] }, { "name": "batch.outputs", "params": [ { "format": "parquet", "inputTableName": "puarquetTable", "path": "/tmp/wow", "mode": "Overwrite" } ] } ], "configParams": { } }
在批处理里,batch.sources/batch.outputs 都是可有可无的,但是对于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs 则是必须的。
Property Name | Default | Meaning |
---|---|---|
streaming.name | (none) required | 等价于 spark.app.name |
streaming.master | (none) required | 等价于 spark.master |
streaming.duration | 10 seconds | spark streaming 周期,默认单位为秒 |
streaming.rest | true/false,default is false | 是否提供http接口 |
streaming.spark.service | true/false,default is false | 开启该选项时,streaming.platform必须为spark. 该选项会保证spark实例不会退出 |
streaming.platform | spark/spark_streaming/ss/flink,default is spark | 基于什么平台跑 |
streaming.checkpoint | (none) | spark streaming checkpoint 目录 |
streaming.kafka.offsetPath | (none) | kafka的偏移量保存目录。如果没有设置,会保存在内存中 |
streaming.driver.port | 9003 | 配置streaming.rest使用,streaming.rest为true,你可以设置一个http端口 |
streaming.spark.hadoop.* | (none) | hadoop configuration,eg. -streaming.spark.hadoop.fs.defaultFS hdfs://name:8020 |
streaming.job.file.path | (none) | 配置文件路径,默认从hdfs加载 |
streaming.jobs | (none) | json配置文件里的job名称,按逗号分隔。如果没有配置该参数,默认运行所有job |
streaming.zk.servers | (none) | 如果把spark作为一个server,那么streamingpro会把driver地址注册到zookeeper上 |
streaming.zk.conf_root_dir | (none) | 配置streaming.zk.servers使用 |
streaming.enableHiveSupport | false | 是否支持Hive |
streaming.thrift | false | 是否thrift server |
streaming.sql.source.[name].[参数] | (none) | batch/ss/stream.sources 中,你可以替换里面的任何一个参数 |
streaming.sql.out.[name].[参数] | (none) | batch/ss/stream.outputs 中,你可以替换里面的任何一个参数 |
streaming.sql.params.[param-name] | (none) | batch/ss/stream.sql中,你是可以写表达式的,比如 select * from :table, 之后你可以通过命令行传递该table参数 |
后面三个参数值得进一步说明:
假设我们定义了两个数据源,firstSource,secondSource,描述如下:
{ "name": "batch.sources", "params": [ { "name":"firstSource", "path": "file:///tmp/sample_article.txt", "format": "com.databricks.spark.csv", "outputTable": "article", "header":true }, { "name":"secondSource", "path": "file:///tmp/sample_article2.txt", "format": "com.databricks.spark.csv", "outputTable": "article2", "header":true } ] }
我们希望path不是固定的,而是启动时候决定的,这个时候,我们可以在启动脚本中使用-streaming.sql.source.[name].[参数] 来完成这个需求。 比如:
-streaming.sql.source.firstSource.path file:///tmp/wow.txt
这个时候,streamingpro启动的时候会动态将path 替换成你要的。包括outputTable等都是可以替换的。
有时候我们需要定时执行一个任务,而sql语句也是动态变化的,具体如下:
{ "name": "batch.sql", "params": [ { "sql": "select * from test where hp_time=:today", "outputTableName": "finalOutputTable" } ] },
这个时候我们在启动streamingpro的时候,通过参数:
-streaming.sql.params.today "2017"
动态替换 sql语句里的:today
准备工作 下载Spark 1.6.2 下载StreamingPro 我们假设你下载的StreamingPro包在/tmp目录下。 复制如下模板 { "esToCsv": { "desc": "测试", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "r
前言 最近给StreamingPro提供了两个新的模块,一个是streamingpro-manager,一个是streamingpro-api。 streamingpro-manager主要是提供一个部署,管理Spark任务的web服务。streamingpro则是增强了定制StreamingPro的能力。当然,还有就是对原有功能的增强,比如StreamingPro SQL Server 支持异步
前言 CarbonData已经发布了1.0版本,变更还是很快的,这个版本已经移除了kettle了,使得部署和使用 变得很简单,而且支持1.6+ ,2.0+等多个Spark版本。 StreamingPro可以使得你很简单通过一个命令就能体验Carbondata,并且支持Http/JDBC的访问形态。 下载Spark发行版 比如我下载后的版本是这个: spark-1.6.3-bin-hadoop2.6
如何命令行指定输如输出的参数? 下面有个输出的例子: { "name": "stream.outputs", "params": [ { "name":"jack", "format": "jdbc", "path": "-", "driver":"com.mysql.jdbc.Driver", "url":"jdbc:mysql://127.0.0.1/test?characterEncodin
我们知道StreamingPro 是一个完全SQL/Script化的,基于Spark平台的一套生产力工具。但是不可避免的,我们可能希望直接操作SqlContext或者使用原生的DataFrame API。 这里我们通过script 让大家支持这个功能: { "name": "batch.script.df", "params": [ {
前言 受spark sql在喜马拉雅的使用之xql 这篇文章影响,我发现类似下面这种语法是极好的: //加载mysql表 load jdbc.`mysql1.tb_v_user` as mysql_tb_user; //处理后映射成spark临时表 select * from mysql_tb_user limit 100 as result_csv; //保存到文件里 save result_c
前言 今天看到了一篇 AI前线的文章谷歌BigQuery ML正式上岗,只会用SQL也能玩转机器学习!。正好自己也在力推 StreamingPro的MLSQL。 今天就来对比下这两款产品。 StreamingPro简介 StreamingPro是一套基于Spark的数据平台,MLSQL是基于StreamingPro的算法平台。利用MLSQL,你可以用类似SQL的方式完成数据的ETL,算法训练,模型
前言 前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。 另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并
前言 之前已经写过一篇文章,StreamingPro 支持Spark Structured Streaming,不过当时只是玩票性质的,因为对Spark 2.0+ 版本其实也只是尝试性质的,重点还是放在了spark 1.6 系列的。不过时间在推移,Spark 2.0+ 版本还是大势所趋。所以这一版对底层做了很大的重构,StreamingPro目前支持Flink,Spark 1.6+, Spark
我们在WebLogic11g下使用JSF2.1+PrimeFaces6.0+PrimeFaces-Extensions6.0.0、mojarra 2.1.7。 单击p:commandButton后,DOM inspector显示对话框已在正文和html标记之外创建,如下一个图像所示: 在正文外部呈现的对话框 如果我们使用相同的代码(没有帧)创建一个新的.xhtml,并单击p:commandButt
许多的 PHP 开发者都使用框架,而不是重新造轮子来构建 Web 应用。框架抽象了许多底层常用的逻辑,并提供了有益又简便的方法來完成常见的任务。 你并不一定要在每个项目中都使用框架。有时候原生的 PHP 才是正确的选择,但如果你需要一个框架,那么有如下三种主要类型: 微型框架 全栈框架 组件框架 微型框架基本上是一个封装的路由,用来转发 HTTP 请求至一个闭包,控制器,或方法等等,尽可能地加快开
主要内容:iframe - 设置高度与宽度,实例,iframe - 移除边框,实例,使用 iframe 来显示目标链接页面,实例,HTML iframe 标签通过使用框架,你可以在同一个浏览器窗口中显示不止一个页面。 iframe语法: 该URL指向不同的网页。 iframe - 设置高度与宽度 height 和 width 属性用来定义iframe标签的高度与宽度。 属性默认以像素为单位, 但是你可以指定其按比例显示 (如:"80%")。 实例 <iframe loading="lazy" s
tornado.web — RequestHandler and Application classes Thread-safety notes Request handlers Entry points Input Output Cookies Other Application configuration Decorators Everything else tornado.template
UI /主题框架和组件 LESS支持下表中列出的一些UI/Theme框架 - Sr.No. 框架和描述 1 1pxdeep 它是平坦的Bootstrap 3主题,提供强大的配色方案控件。 2 Bootflat 它是一个基于Bootstrap的开源框架。 3 BootPress 它是一个基于平面文件CMS的PHP框架 4 Bootstrap 它是功能强大的移动第一前端框架,可实现更快,更轻松的Web
本章介绍 Decoder(解码器) Encoder(编码器) Codec(编解码器) 在前面的章节中,我们讨论了连接到拦截操作或数据处理链的不同方式,展示了如何使用 ChannelHandler 及其相关的类来实现几乎任何一种应用程序所需的逻辑。但正如标准架构模式通常有专门的框架,通用处理模式很适合使用目标实现,可以节省我们大量的开发时间和精力。 在这一章,我们将研究编码和解码——数据从一种特定协
WLAN框架接口 函数 rt_err_t rt_wlan_set_mode (const char *dev_name, rt_wlan_mode_t mode) 注册WLAN设备到WLAN设备框架 rt_err_t rt_wlan_connect (const char *ssid, const char *password) 同步连接热点 rt_err_t rt_wlan
通过使用框架,你可以在同一个浏览器窗口中显示不止一个页面。 iframe语法: <iframe src="URL"></iframe> 该URL指向不同的网页。 Iframe - 设置高度与宽度 height 和 width 属性用来定义iframe标签的高度与宽度。 属性默认以像素为单位, 但是你可以指定其按比例显示 (如:"80%")。 <iframe src="demo_iframe.htm