当前位置: 首页 > 知识库问答 >
问题:

Flink:数据流左联接表。超级简单

左华灿
2023-03-14
        DataStream<String> sourceStream = streamEnv.fromElements("key_a", "key_b", "key_c", "key_d");

        Table lookupTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("my_key", DataTypes.STRING()),
                        DataTypes.FIELD("my_value", DataTypes.STRING())
                ),
                Expressions.row("key_a", "value_a"),
                Expressions.row("key_b", "value_b")
        );

我想离开加入流到桌子上。

这显然是一个简化的演示场景。在使用更大的生产数据集之前,我想了解如何使用Flink API通过玩具数据集实现这一点。

表联接上的文档显示了如何联接两个表并取回另一个表,这不是我想要的:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins

DataStream joins上的docs显示在一个时间窗口上连接两个流,这也不是我想要的:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html

共有1个答案

汪胤
2023-03-14

我相信这就是你要找的。此示例将源流转换为动态表,将其与查找表联接,然后将生成的动态表转换回流以进行打印。

相反,您可以使用DataStream API对结果流进行进一步处理。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class JoinExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<String> sourceStream = env.fromElements("key_a", "key_b", "key_c", "key_d");
        Table streamTable = tableEnv.fromDataStream(sourceStream, $("stream_key"));

        Table lookupTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("lookup_key", DataTypes.STRING()),
                        DataTypes.FIELD("lookup_value", DataTypes.STRING())
                ),
                Expressions.row("key_a", "value_a"),
                Expressions.row("key_b", "value_b")
        );

        Table resultTable = streamTable
                .join(lookupTable).where($("stream_key").isEqual($("lookup_key")))
                .select($("stream_key"), $("lookup_value"));

        DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

        resultStream.print();

        env.execute();
    }
}

输出为

key_b,value_b
key_a,value_a
 类似资料:
  • 不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?

  • Flink Stream支持内部连接表达式,如窗口连接,间隔连接。但不支持左连接/完全连接表达式。当然,窗口共组表达式可以实现相同的语义,即使事件已立即加入,也必须等待完全窗口大小的时间。我的问题是: 如何从设计角度解释Flink Stream不支持左连接/全连接说明? 我如何通过Flink DataStream API实现它(如果可以立即转发加入事件会更好)? 有没有办法扩展Flink Data

  • 我有一个表,需要离开外部连接两个不同的表。当我将表放入查询中两次,并将其与where子句中的self连接时(如下面的句子),它就可以工作了。我认为这不应该是正确的做法。如何编写select语句并将表与多个不同的表进行外部联接?

  • 根据我对左向外连接的理解,结果表的行永远不会比左表多...请让我知道如果这是错的... 我的左表是192572行8列。 我右边的表格是42160行和5列。 我的左表有一个名为“id”的字段,它与我的右表中名为“key”的列相匹配。 因此,我将它们合并为: 但是然后组合的形状是236569。 我有什么误会?

  • 我想添加表示来自其他表的计数的列。 我有三张桌子。 消息 主题 STARS_GIVED 我想以: 主题回顾 所以基本上,我想附上3列唯一值的计数(每个主题中给出的星数,在主题中有消息的唯一用户,以及每个主题中唯一消息的数量)。 我希望最终能够对类别进行筛选(看看两列)。 此外,我希望最终按我加入的计数排序。例如,我将有一个按钮,按“星星的数目”按升序排序,或按“用户的数目”按降序排序,等等。 我试

  • 问题内容: 我有两张桌子。表A列出了员工姓名。表B是一个复杂的表,其中包含有关员工打来的电话的信息。 我的目标是制作一个包含“名称”和“ callCount”列的表。我的目标是“左加入”和“分组依据”,但是我一直想念没有打过电话的员工。我怎样才能只保留名称并在其中放置零? 也许我很亲密,有人可以指出我的错字吗?在此先感谢您的帮助,以下是SQL: 问题答案: 这是一个JOIN而非NULL问题:您的过