当前位置: 首页 > 工具软件 > Sylph > 使用案例 >

sylph使用

赫连棋
2023-12-01

快速入门

下面将以StreamSql为实例,一步步地搭建出一个 分布式流计算应用,让你能快速的入门 SYLPH。

StreamSql是完全通过类sql来描述整个流计算的过程。主要需要描述: 数据源如何接入、如何计算、如何输出到外部存储; 例如计算每分钟的pv; 每5秒更新一次最近一分钟的uv。

demo1

下面例子演示将kafka topic TP_A_1,TP_A_2的数据实时写入mysql表mysql_table_sink

注意: mysql中表mysql_table_sink需要提前自行创建好,并且字段类型要和实际兼容

-- 定义数据流接入 
create source table topic1(
    _topic varchar,
    _key varchar,
    _message varchar,
    _partition integer,
    _offset bigint
) with (
    type = 'kafka',
    kafka_topic = 'TP_A_1,TP_A_2',
    "auto.offset.reset" = latest,
    kafka_broker = 'localhost:9092',
    kafka_group_id = 'streamSql_test1'
);
-- 定义数据流输出位置
create sink table mysql_table_sink(
    a1 varchar,
    a2 varchar,
    event_time bigint
) with (
    type = 'mysql',   -- ideal.sylph.plugins.flink.sink.MysqlSink.java
    userName = 'demo',
    password = 'demo',
    url = 'jdbc:mysql://localhost:3306/pop?characterEncoding=utf-8&useSSL=false',
    query = 'insert into mysql_table_sink values(${0},${1},${2})'
);
-- 描述数据流计算过程
insert into mysql_table_sink
select _topic,`_message`,cast(_offset as bigint) from topic1 where _key is not null
--where _key is not null 加上之后需要传key才可以

demo2

下面的例子 演示如何计算topic TP_A_1每分钟的uv

create source table topic1(
    _topic varchar,
    _key varchar,
    _message varchar,
    _partition integer,
    _offset bigint
) with (
    type = 'kafka',
    kafka_topic = 'TP_A_1',
    "auto.offset.reset" = latest,
    kafka_broker = 'localhost:9092',
    kafka_group_id = 'streamSql_test1'
);

create sink table mysql_uv_table_sink(
    user_id varchar,
    uv bigint,
    cnt_time date
) with (
    type = 'mysql',   -- ideal.sylph.plugins.flink.sink.MysqlSink.java
    userName = 'demo',
    password = 'demo',
    url = 'jdbc:mysql://localhost:3306/pop?characterEncoding=utf-8&useSSL=false',
    query = 'insert into mysql_uv_table_sink values(${0},${1},${2})'
);

with tb13 as (SELECT proctime
    ,row_get(rowline,0)as user_id
    FROM topic1, LATERAL TABLE(json_parser(_message,'user_id')) as T(rowline) 
    where cast(row_get(rowline,0) as varchar) is not null
)
insert into mysql_uv_table_sink
select 
user_id,
count(distinct user_id) as uv
,TUMBLE_START(proctime,INTERVAL '60' SECOND) AS window_start 
FROM tb13 GROUP BY user_id,TUMBLE(proctime,INTERVAL '60' SECOND)

 

 类似资料: