Sylph 是一个一站式的大数据流计算平台,通过编译Stream SQL,sylph会自动生成Apache Flink等分布式程序到Apache Yarn集群运行。
Sylph地址:https://github.com/harbby/sylph/
以下开发基于Sylph 0.5.0版本
开发目标:由于当前Sylph提供的数据流接入类型仅有kafka及一个test类型,希望可以支持从hdfs接入数据流,因此需自定义一个hdfs的数据源
**开发流程:**数据源配置类–>>自定义数据源对象(继承一个类,实现一个接口)–>>定义数据流Row的属性类型–>>在run方法中读取hdfs文件、发射数据流–>>加载数据源–>>定义job示例–>>重新编译sylph–>>运行查看结果
**注意点:**定义数据源对象时继承关系必须是implements Source<DataStream<org.apache.flink.types.*Row>>,据说从sylph0.6 alpha3开始还需要一个Plugin.class类,这个还没研究,sylph0.5版本只有继承关系的约束。约束必须指明schema,即定义public TypeInformation getProducedType()
以下是实现代码,运行效果是:启动job示例,该job会定时读取hdfs文件,将文件内容打印到console,格式为(随机key,文件内容的一行,事件时间)
数据源配置类:
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
public class HdfsSourceConfig extends PluginConfig {
private static final long serialVersionUID = 2L;
private HdfsSourceConfig() {
}
@Name("fs.defaultFS")
@Description("this is fs.defaultFS")
private String fsAddress;
@Name("hdfs_read_dir")
@Description("this is the hdfs source file dir")
private String hdfsFileDir;
@Name("hdfs_time_interval")
@Description("this is the hdfs source file update time interval (ms)")
private Long hdfsTimeInterval;
public String getFsAddress()
{
return fsAddress;
}
public String getHdfsFileDir()
{
return hdfsFileDir;
}
public Long getTimeInterval()
{
return hdfsTimeInterval;
}
}
自定义数据源对象第一种实现方式:
指定一个hdfs上的文本文件,隔一段时间读取一次该文件的内容,作为数据源
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@Name(value = "HdfsSource")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
* (定时读取)数据源为hdfs的一个文件,具体到文件名
* job设置示例:
* type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/test.txt',
hdfs_time_interval = 10000
*/
public class HdfsSource1 implements Source<DataStream<Row>> {
private static final long serialVersionUID = 2L;
private final transient Supplier<DataStream<Row>> loadStream;
private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};
public HdfsSource1(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
// 加载数据源
// this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
}
public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
return execEnv.addSource(new MyDataSource(config));
}
@Override
public DataStream<Row> getSource() {
return loadStream.get();
}
/**
* 自定义数据源
**/
public static class MyDataSource
extends RichSourceFunction<Row>
implements ResultTypeQueryable<Row> {
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
private HdfsSourceConfig hdfsSourceConfig;
private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
this.hdfsSourceConfig = hdfsSourceConfig;
}
// DataStream 调用一次 run() 方法用来获取数据
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Configuration conf = new Configuration();
// 文件路径及文件名
String file = hdfsSourceConfig.getHdfsFileDir();
// hdfs文件全路径
String hdfsPath = "hdfs://" + hdfsSourceConfig.getFsAddress() + file;
conf.set("fs.defaultFS", hdfsPath);
Random random = new Random();
//表示数据已经产生了 但是会有10秒以内的延迟
long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);
while (running) {
try {
FileSystem fileSystem = FileSystem.get(conf);
Path path = new Path(file);
if (!fileSystem.exists(path)) {
System.out.println("File " + file + " does not exists");
return;
}
// 打开hdfs上的文件
FSDataInputStream in = fileSystem.open(path);
// 按行读取
BufferedReader bufferedReader = null;
String lineTxt = null;
bufferedReader = new BufferedReader(new InputStreamReader(in));
try {
while ((lineTxt = bufferedReader.readLine()) != null) {
// 逐行读取文件,并按schema构造Row
Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
// 调用collect()来发射连续的数据流
ctx.collect(row);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
in.close();
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 每隔一定时间重新读取文件
TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Row的各个属性的类型定义
@Override
public TypeInformation<Row> getProducedType() {
TypeInformation<?>[] types = new TypeInformation<?>[]{
//createTypeInformation[String]
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class),
//createTypeInformation[long]
TypeExtractor.createTypeInfo(long.class)
};
// 指明schema createTypeInformation[Row]
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
return rowTypeInfo;
}
@Override
public void cancel() {
running = false;
}
@Override
public void close()
throws Exception {
this.cancel();
super.close();
}
}
}
自定义数据源对象第二种实现方式:
指定一个hdfs上的目录,隔一段时间读取该目录下所有文件的内容,作为数据源
package ideal.sylph.plugins.hdfs;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@Name(value = "HdfsSource2")
@Version("1.0.0")
@Description("this flink Hdfs source inputStream")
/**
* (定时读取)数据源为hdfs的一个目录下的所有文件,未递归到二级目录
* job设置示例:
* type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/',
hdfs_time_interval = 10000
*/
public class HdfsSource2 implements Source<DataStream<Row>> {
private static final long serialVersionUID = 2L;
private final transient Supplier<DataStream<Row>> loadStream;
private static final String[] HDFS_COLUMNS = new String[]{"key", "message", "event_time"};
public HdfsSource2(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
// 加载数据源
// this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
this.loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config));
}
public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, HdfsSourceConfig config) {
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
return execEnv.addSource(new HdfsSource2.MyDataSource(config));
}
@Override
public DataStream<Row> getSource() {
return loadStream.get();
}
/**
* 自定义数据源
**/
public static class MyDataSource
extends RichSourceFunction<Row>
implements ResultTypeQueryable<Row> {
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
private HdfsSourceConfig hdfsSourceConfig;
private MyDataSource(HdfsSourceConfig hdfsSourceConfig) {
this.hdfsSourceConfig = hdfsSourceConfig;
}
// DataStream 调用一次 run() 方法用来获取数据
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Configuration conf = new Configuration();
URI uri = URI.create("hdfs://" + hdfsSourceConfig.getFsAddress() );
// 文件路径
String fileDir = hdfsSourceConfig.getHdfsFileDir();
Random random = new Random();
//表示数据已经产生了 但是会有10秒以内的延迟
long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000);
while (running) {
try {
// 循环读取目录下的文件
FileSystem fileSystem = FileSystem.get(uri, conf);
Path path = new Path(fileDir);
BufferedReader bufferedReader = null;
// 列出目录下的所有文件
FileStatus[] files = fileSystem.globStatus(path);
for (FileStatus file : files) {
if (file.isDirectory()) {
System.out.println("这是文件夹");
System.out.println(file.getPath());
// 通过fs的listFiles方法可以自动实现递归(自带递归)列出文件类型,返回的是一个远程可迭代对象
// 需要传入两个参数,第一个参数是服务器路径,第二个参数是否递归
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(file.getPath(), false);
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
// 文件名
System.out.println(fileStatus.getPath().getName());
Path filePath = new Path(fileDir + fileStatus.getPath().getName());
// 打开hdfs上的文件
FSDataInputStream in = fileSystem.open(filePath);
// 按行读取
String lineTxt = null;
bufferedReader = new BufferedReader(new InputStreamReader(in));
try {
while ((lineTxt = bufferedReader.readLine()) != null) {
// 逐行读取文件,并按schema构造Row
Row row = Row.of("key" + random.nextInt(10), lineTxt, eventTime);
// 调用collect()来发射连续的数据流
ctx.collect(row);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} else if(file.isFile()){
System.out.println("这是文件");
System.out.println(file.getPath());
return;
} else if(file.isSymlink()) {
System.out.println("这是链接文件");
System.out.println(file.getPath());
return;
} else {
System.out.println("这是其他");
System.out.println(file.getPath());
return;
}
}
if (bufferedReader != null) {
try {
bufferedReader.close();
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 每隔一定时间重新读取文件
TimeUnit.MILLISECONDS.sleep(hdfsSourceConfig.getTimeInterval());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Row的各个属性的类型定义
@Override
public TypeInformation<Row> getProducedType() {
TypeInformation<?>[] types = new TypeInformation<?>[]{
//createTypeInformation[String]
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class),
//createTypeInformation[long]
TypeExtractor.createTypeInfo(long.class)
};
// 指明schema createTypeInformation[Row]
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, HDFS_COLUMNS);
return rowTypeInfo;
}
@Override
public void cancel() {
running = false;
}
@Override
public void close()
throws Exception {
this.cancel();
super.close();
}
}
}
定义一个job示例:
./sylph/sylph-dist/src/jobs/hdfs_test/job.flow
create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson';
create source table topic1(
key varchar,
message varchar,
event_time bigint
) with (
type = 'HdfsSource',
fs.defaultFS = 'localhost:8020',
hdfs_read_dir = '/data/test.txt',
hdfs_time_interval = 10000
);
-- 定义数据流输出位置
create sink table event_log(
key varchar,
message varchar,
event_time bigint
) with (
type = 'console', -- print console
other = 'demo001'
);
insert into event_log
select key,message,event_time from topic1
./sylph/sylph-dist/src/jobs/hdfs_test/job.type
---
type: "StreamSql"
config:
taskManagerMemoryMb: 2048
taskManagerCount: 2
taskManagerSlots: 2
jobManagerMemoryMb: 1024
checkpointInterval: -1
checkpointTimeout: 600000
parallelism: 4
queue: "default"
appTags:
- "Sylph"
- "Flink"