当前位置: 首页 > 知识库问答 >
问题:

如何用嵌套字段注册Flink表模式?

楚奇逸
2023-03-14

我正在StreamingTableEnvironment中将数据流注册为Flink表。

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.createTemporaryView(
       "foo",
       dataStream,
       $("f0").as("foo-id"),
       $("f1").as("foo-value") 
)
CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
);

我没有使用Create Table命令with connector注册流,因为数据格式是自定义的,这就是我为什么要注册流的原因。

共有1个答案

印曜灿
2023-03-14

我尝试了一些返回类型和嵌套,这个解决方案似乎很管用:

DataStream<Row> testDataStream = env.fromCollection(Arrays.asList(
     Row.of("sherin", 1L, Row.of(100L, 200L)),
     Row.of("thomas", 1L, Row.of(100L, 200L))
)).returns(
    Types.ROW(
             Types.STRING,
             Types.LONG,
             Types.ROW_NAMED(
                  new String[]{"val1", "val2"},
                  Types.LONG,
                  Types.LONG)
));

tableEnv.createTemporaryView(
    "foo",
    testDataStream,
    $("f0").as("name"),
    $("f1").as("c"),
    $("f2").as("age"));

Table testTable = tableEnv.sqlQuery(
     "select name, c, age.val1, age.val2 from foo"
);

DataStream<Row> result = tableEnv.toAppendStream(
    testTable,
    TypeInformation.of(Row.class)
);

result.print().setParallelism(2);

如果有更好的方法的话,我仍然愿意接受各种想法。

 类似资料:
  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 问题内容: 我在这里分叉了一个很棒的项目,并且刚刚学习了一些东西,就把它弄乱了。我不知道的问题是有关自定义编组的一些问题,如果您在这里看到,您可以看到它解组了包含一个字段的结构,然后使用该字段将其解组。除了嵌套的情况,这一切都很好。因此,最好的例子是: 只需说您有一个善意的结构是a ,那么就可以将其编组。然后具有3个保留在字段中的类型。这些孩子最终成为我的问题类型。我如何才能解决嵌套数据的问题?这

  • 我在这里分叉了一个很棒的项目,并且一直在学习一些东西。我无法弄清楚的问题是关于自定义解编的一些事情,如果您看到这里,您可以看到这解封了包含字段的结构,然后使用字段解组。除了嵌套情况外,这一切都很好用。所以最好的事情是一个例子: 假设你有一个< code>Thing结构,它是一个< code>listing,也就是< code>Thing。数据这被解组为类型< code >列表。那么< code>l

  • 假设我有这些实体: null

  • 问题内容: 假设我有一个像这样的领域模型: 现在,我可以像这样创建一个教师比较器: 但是,我如何像这样在嵌套字段上比较Lecture? 我无法在模型上添加方法。 问题答案: 您不能嵌套方法引用。您可以改用lambda表达式: 无需反向顺序,它就不再那么冗长了: 注意:在某些情况下,您需要明确声明泛型类型。对于 例如,下面的代码不会没有工作,之前在Java中8。 较新的Java版本具有更好的自动类型

  • 我有以下课程: 问题是我只能修改类。