我正在StreamingTableEnvironment中将数据流注册为Flink表。
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryView(
"foo",
dataStream,
$("f0").as("foo-id"),
$("f1").as("foo-value")
)
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
我没有使用Create Table命令with connector注册流,因为数据格式是自定义的,这就是我为什么要注册流的原因。
我尝试了一些返回类型和嵌套,这个解决方案似乎很管用:
DataStream<Row> testDataStream = env.fromCollection(Arrays.asList(
Row.of("sherin", 1L, Row.of(100L, 200L)),
Row.of("thomas", 1L, Row.of(100L, 200L))
)).returns(
Types.ROW(
Types.STRING,
Types.LONG,
Types.ROW_NAMED(
new String[]{"val1", "val2"},
Types.LONG,
Types.LONG)
));
tableEnv.createTemporaryView(
"foo",
testDataStream,
$("f0").as("name"),
$("f1").as("c"),
$("f2").as("age"));
Table testTable = tableEnv.sqlQuery(
"select name, c, age.val1, age.val2 from foo"
);
DataStream<Row> result = tableEnv.toAppendStream(
testTable,
TypeInformation.of(Row.class)
);
result.print().setParallelism(2);
如果有更好的方法的话,我仍然愿意接受各种想法。
我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表
问题内容: 我在这里分叉了一个很棒的项目,并且刚刚学习了一些东西,就把它弄乱了。我不知道的问题是有关自定义编组的一些问题,如果您在这里看到,您可以看到它解组了包含一个字段的结构,然后使用该字段将其解组。除了嵌套的情况,这一切都很好。因此,最好的例子是: 只需说您有一个善意的结构是a ,那么就可以将其编组。然后具有3个保留在字段中的类型。这些孩子最终成为我的问题类型。我如何才能解决嵌套数据的问题?这
我在这里分叉了一个很棒的项目,并且一直在学习一些东西。我无法弄清楚的问题是关于自定义解编的一些事情,如果您看到这里,您可以看到这解封了包含字段的结构,然后使用字段解组。除了嵌套情况外,这一切都很好用。所以最好的事情是一个例子: 假设你有一个< code>Thing结构,它是一个< code>listing,也就是< code>Thing。数据这被解组为类型< code >列表。那么< code>l
假设我有这些实体: null
问题内容: 假设我有一个像这样的领域模型: 现在,我可以像这样创建一个教师比较器: 但是,我如何像这样在嵌套字段上比较Lecture? 我无法在模型上添加方法。 问题答案: 您不能嵌套方法引用。您可以改用lambda表达式: 无需反向顺序,它就不再那么冗长了: 注意:在某些情况下,您需要明确声明泛型类型。对于 例如,下面的代码不会没有工作,之前在Java中8。 较新的Java版本具有更好的自动类型
我有以下课程: 问题是我只能修改类。