当前位置: 首页 > 知识库问答 >
问题:

AppendStreamTableSink不支持使用节点Join产生的更新更改(joinType=[InnerJoin]

陈项禹
2023-03-14

当我使用Flink SQL执行以下语句时,错误报告如下:

请求

根据< code>user_id字段对user_behavior_kafka_table中的数据进行分组,然后取出每组中< code>ts字段值最大的数据

Excute sql

SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a 
WHERE ts = (select max(b.ts) 
FROM user_behavior_kafka_table AS b 
WHERE a.user_id = b.user_id );

Flink 版本

1.11.2

错误消息

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])

作业部署

在纱线上

表消息

    < li >来自消费者kafka主题的user_behavior_kafka_table数据

{“user_id”:“aaa”,“item_id”:“11-222-333”,“comment”:“aaa access item at”,“ts”:100}

{"user_id":"ccc","item_id":"11-222-334","注释":"ccc访问项at","ts": 200}

{“user_id”:“ccc”,“item_id:”11-222-333,“comment:”ccc访问项在“,”ts:”300}

{“user_id”:“bbb”,“item_id”:“11-222-334”,“comment”:“bbb access item at”,“ts”:200}

{“user_id”:“aaa”,“item_id:”11-222-333,“comment:”aaa访问项在“,”ts:”200}

{user_id:aaa,item_id:11-222-334,注释:aaa访问项at,ts:400}

{"user_id":"ccc","item_id":"11-222-333","注释":"ccc访问项at","ts": 400}

{user_id:vvv,item_id:11-222-334,注释:vvv访问项at,ts:200}

{“user_id”:“bbb”,“item_id:”11-222-333,“comment:”bbb访问项在“,”ts:”300}

{user_id:aaa,item_id:11-222-334,注释:aaa访问项at,ts:300}

{“user_id”:“ccc”,“item_id:”11-222-333,“comment:”ccc访问项在“,”ts:”100}

{“user_id”:“bbb”,“item_id:”11-222-334,“comment:”bbb访问项在“,”ts:”100}

  • user_behavior_hive_table 预期结果

{user_id:aaa,item_id:11-222-334,注释:aaa访问项at,ts:400}

{“user_id”:“bbb”,“item_id:”11-222-333,“comment:”bbb访问项在“,”ts:”300}

{"user_id":"ccc","item_id":"11-222-333","注释":"ccc访问项at","ts": 400}

{user_id:vvv,item_id:11-222-334,注释:vvv访问项at,ts:200}

共有1个答案

刁丰羽
2023-03-14

要从该查询中获得预期结果,需要以批处理模式执行该查询。作为一个流式查询,Flink SQL planner无法处理它,如果可以,它将生成一个结果流,其中每个user_id的最后结果将与预期结果匹配,但会有额外的中间结果。

例如,对于用户 aaa,将显示以下结果:

aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400

但是将跳过 ts=300 的行,因为它从来不是 ts 最大值的行。

如果要在流式处理模式下执行此操作,请尝试将其重新设置为 top-n 查询:

SELECT user_id, item_id, ts FROM
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
  FROM user_behavior_kafka_table)
WHERE row_num = 1;

我相信这应该有效,但我不能轻易测试它。

 类似资料:
  • 问题内容: 我有一个XML文件test.xml 我正在尝试在表单提交中使用PHP更新节点xName和yName。因此,我已经使用simplexml_load_file()加载了文件。PHP表单操作代码如下 我想更新节点值,但是上面的代码似乎不正确。谁能帮助我纠正它? 更新:我的问题有点类似于使用PHP更新XML文件,但是在这里,我要从外部文件加载XML,同时还要更新元素而不是属性。那就是我的困惑所

  • 我是XSLT新手。我想根据其他子节点的条件更改XML中的根节点。但子节点始终保持不变。例如,我有以下XML: 我喜欢将XML更改为: 这意味着依赖于<代码> 我不想在每个<代码>

  • 在oh-my-zsh文件中,我通过更改. zshrc文件中的行来添加nvm。 插件=(git nvm)//添加了nvm 现在我看到了nvm,并在终端上使用它 但是当我改变下面看到的版本。 然后我关闭终端,打开一个新的,检查版本,它仍然显示使用以前的版本!它没有保持变化。 新窗口

  • 问题内容: 使用Hibernate,我想根据条件更新数据库中的数据,但是出现以下错误:“遍历的节点不能为空” 这是我的数据库描述: 这是我的JPA: 我究竟做错了什么?如果我将LEFT JOIN移到SET之前: 我得到:“正在期待SET,找到了LEFT” 如果我删除联接: 我得到:“非法尝试取消对集合的引用”。 更新值的正确方法是什么? 谢谢你的帮助! 问题答案: 第4章中的JPA 2.0规范包含

  • 我有两个表,喜欢用另一个表的值更新其中一个表。 我已经尝试过这些查询,但SQLite不支持使用UPDATE进行连接。任何人都可以提出查询。谢谢您的帮助。

  • 我以前从未使用过JPQL,我需要用它来批量更新特定列中的值。该列存在于表中,但实体类是在没有变量的情况下设置的。我不能使用确切的数据,但设置会像这样给你一个想法: 数据库列 JPA 实体类 在 Books 实体类中,有一个 getter 和 这两个问题都是b.作者提出的。authorId和b.author是无效的标识符。是否可以编写两个单独的查询?1个查询作为select语句,我可以使用连接,然后