当前位置: 首页 > 知识库问答 >
问题:

带debezium-json格式的Flink 1.11

皇甫波峻
2023-03-14

在Flink 1.11中,我正在尝试使用debezium格式,以下应该可以工作,对吗?我在试着遵循文档[1]

    TableResult products = bsTableEnv.executeSql(
            "CREATE TABLE products (\n" +
                    "  id BIGINT,\n" +
                    "  name STRING,\n" +
                    "  description STRING,\n" +
                    "  weight DECIMAL(10, 2)\n" +
                    ") WITH (\n" +
                    " 'connector' = 'kafka',\n" +
                    " 'topic' = 'dbserver1.inventory.products',\n" +
                    " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                    " 'properties.group.id' = 'testGroup',\n" +
                    "'scan.startup.mode'='earliest-offset',\n" +
                    " 'format' = 'debezium-json'" +
                    ")"
    );

    bsTableEnv.executeSql("SHOW TABLES").print(); // This seems to work; 
    bsTableEnv.executeSql("SELECT id FROM products").print();

输出代码段/异常:

+------------+
| table name |
+------------+
|   products |
+------------+
1 row in set
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, products]], fields=[id, name, description, weight])

我已经验证了Debezium设置,dbserver1中有消息。库存产品主题。我可以使用其他方法阅读Flink中的Kafka主题,但如前所述,我希望使用debezium json格式。

此外,我知道Flink 1.12引入了新的Kafka Upett连接器,但我现在无法使用1.11。

我对Flink很陌生,所以我很可能错过了一些明显的东西。

提前感谢

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

共有1个答案

廉高邈
2023-03-14

似乎问得太早了。如果它可能对其他人有帮助,我可以让它一起工作

Table results = bsTableEnv.sqlQuery("SELECT id, name FROM products");
bsTableEnv.toRetractStream(results, Row.class).print();
 类似资料:
  • 我怎样组合桌子。包括列表和列。包括如果需要快照一个表的一部分和另一个表的完整数据,请在Debezium连接器配置中列出? 连接器配置示例: 有了这些,我得到了Kafka表格1的所有栏目。 只管理如何使用不同的连接器来完成。 我想使用一个连接器获取表2中的所有列和表1中的两列,可以吗?

  • Google最终增加了对Cloudsql、postgresql逻辑复制/解码支持的支持。我正在尝试使用Debezium捕获数据库表上的更改,然后将json中的更改发布到Gcp PubSub。我使用带有Gcp Pubsub接收器的Debezium服务器只是为了验证连接是否工作,但不断得到以下错误消息 所以它似乎默认插件为,但是我在环境变量中设置了。根据这个文档https://hub.docker.c

  • 我有一些具有bg_和cp_前缀数据库表,如“bg_table1”、“bg_table2”和“cp_table1”,还有一些没有任何前缀的表,如my_table1和user_action等等。 我有两个debezium postgreSQL连接器,并尝试通过以下操作配置属性:debezium-http://debezium.io/docs/connectors/postgreSQL/#connect

  • 问题内容: 我正在构建JSON对象,并使用JQuery ajax将其传递到服务器。 但是,当我的blogBody变量包含代码时,将失败并显示错误消息: 有人可以说错误在哪里以及如何解决? 问题答案: 在JSON中,键必须用双引号()而不是单引号()引起。同样,字符串值必须用双引号而不是单引号引起来。您正在使用单引号。例如,标题文字前后。 因此,至少,您需要交换这些引号,例如:

  • 问题内容: 因此,可以说JSON响应为: 当您必须首先访问数据时,如何获得值“ value1”和“ value2”? 如果字段位于根目录,那么我可以让该方法返回带有这些字段名称的POJO。 我基本上希望下面的工作。 问题答案: 您可以尝试以下代码,使用Gson库将json字符串转换为具有必填字段的Pojo对象。 或者,您可以定义嵌套的Pojo类来解析它。 编辑:尝试下面的代码以使用Retrofit

  • JSON 数据格式 JSON 是 JavaScript Object Notation 的简称,是一种轻量的数据表示方法。json格式采用key:value的方式记录数据,非常直观,比XML简洁,因而大受欢迎 介绍json格式前,先让我们看看XML格式。显然,XML 得到了相当多的关注(正面和负面的评价都有),已经在 Ajax 应用程序中广泛使用: <request> <firstName>