当前位置: 首页 > 工具软件 > Apache Beam > 使用案例 >

apache beam 入门之beam-sql

冷夜洛
2023-12-01

目录:apache beam 个人使用经验总结目录和入门指导(Java)

就像spark-sql 一样,apache beam也有beam-sql, 就是能够输入1张模拟数据表, 然后通过sql语句来实现计算。
举个例子,我们不希望在数据源端执行 select * from tableA left join talbeB on tableA.id = tableB.id where tableA.id < 10 这句话, 因为这样很占用数据源端的计算资源(尤其是hive这类数仓), 所以会希望把tableA和tableB的所有数据读入到自己的计算集群中,然后在beam里去执行这一句sql。

如何创建模拟表

首先要创建1个表的schema(模式),或者说叫做表结构。 beam的schema采用builder模式进行建立。

Schema tableASchema = Schema.builder()
.addInt32Field("id")
.addStringField("name")
.build();

接着同样用builder模式去创建1条表的行记录

Row row1 = Row.withSchema(tableASchema)
.addValue(1)
.addValue("tony")
.build();

注意addValue的时候, 要按照schma里添加字段的顺序和类型来添加数据, 不要添加错了。
我们多造几条数据

Row row2 = Row.withSchema(tableASchema)
.addValue(2)
.addValue("tom")
.build();

Row row3 = Row.withSchema(tableASchema)
.addValue(3)
.addValue("jack")
.build();

造好后,用Create进行模拟表的创建,主要不要遗漏setRowSchema,否则会无法识别编码。

PCollection<Row> pTableA = pipeline.apply(Create.of(row1, row2, row3))
.setRowSchema(tableASchema);

这时候在pipeline运行时,pTableA数据集里就会塞进3行记录,但是现在还差1个表名。因此需要把数据集pTable变成PCollectionTuple

PCollectionTuple tupleTableA = PCollectionTuple.of(new TupleTag<>("tableA"), pTableA);

这时候"tableA"这个名字就通过new TupleTag赋予了pTableA,此时tupleTableA可以理解为1张模拟表了。

执行beam-sql

执行beam-sql前,要先添加如下依赖:

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-sql</artifactId>
<version>${beam.version}</version>
</dependency>

接着用SQLTransform这个sdk即可实现beam-sql

// 执行bema-sql
PCollection<Row> afterSelectTableA
= tupleTableA.apply(SqlTransform.query("select name from tableA where id <= 2"));

// 打印结果
afterSelectTableA.apply(ParDo.of(new PrintStrFn()));

pipeline.run().waitUntilFinish();
 类似资料: