当前位置: 首页 > 知识库问答 >
问题:

如何从带有十进制类型列的主题在ksql中创建流

端木野
2023-03-14

我想从kafka topic创建一个流,用于监视mysql表。mysql表具有十进制(16,4)类型的列,当我使用此命令创建流时:create stream test with(KAFKA\u TOPIC='dbServer.KAFKA.DailyUdr',VALUE\u FORMAT='AVRO') 创建并运行流,但结果流中不显示十进制(16,4)类型的列。



    source topic value schema:
    {
      "type": "record",
      "name": "Envelope",
      "namespace": "dbServer.Kafka.DailyUdr",
      "fields": [
        {
          "name": "before",
          "type": [
            "null",
            {
              "type": "record",
              "name": "Value",
              "fields": [
                {
                  "name": "UserId",
                  "type": "int"
                },
                {
                  "name": "NationalCode",
                  "type": "string"
                },
                {
                  "name": "TotalInputOcted",
                  "type": "int"
                },
                {
                  "name": "TotalOutputOcted",
                  "type": "int"
                },
                {
                  "name": "Date",
                  "type": "string"
                },
                {
                  "name": "Service",
                  "type": "string"
                },
                {
                  "name": "decimalCol",
                  "type": [
                    "null",
                    {
                      "type": "bytes",
                      "scale": 4,
                      "precision": 16,
                      "connect.version": 1,
                      "connect.parameters": {
                        "scale": "4",
                        "connect.decimal.precision": "16"
                      },
                      "connect.name": "org.apache.kafka.connect.data.Decimal",
                      "logicalType": "decimal"
                    }
                  ],
                  "default": null
                }
              ],
              "connect.name": "dbServer.Kafka.DailyUdr.Value"
            }
          ],
          "default": null
        },
        {
          "name": "after",
          "type": [
            "null",
            "Value"
          ],
          "default": null
        },
        {
          "name": "source",
          "type": {
            "type": "record",
            "name": "Source",
            "namespace": "io.debezium.connector.mysql",
            "fields": [
              {
                "name": "version",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "connector",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "server_id",
                "type": "long"
              },
              {
                "name": "ts_sec",
                "type": "long"
              },
              {
                "name": "gtid",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "file",
                "type": "string"
              },
              {
                "name": "pos",
                "type": "long"
              },
              {
                "name": "row",
                "type": "int"
              },
              {
                "name": "snapshot",
                "type": [
                  {
                    "type": "boolean",
                    "connect.default": false
                  },
                  "null"
                ],
                "default": false
              },
              {
                "name": "thread",
                "type": [
                  "null",
                  "long"
                ],
                "default": null
              },
              {
                "name": "db",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "table",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "query",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              }
            ],
            "connect.name": "io.debezium.connector.mysql.Source"
          }
        },
        {
          "name": "op",
          "type": "string"
        },
        {
          "name": "ts_ms",
          "type": [
            "null",
            "long"
          ],
          "default": null
        }
      ],
      "connect.name": "dbServer.Kafka.DailyUdr.Envelope"
    }

我的问题在decimalCol列


共有1个答案

隆向晨
2023-03-14

KSQL尚不支持DECIMAL数据类型。

这里有一个问题,如果你认为有用的话,你可以跟踪并投票。

 类似资料:
  • 问题内容: 我正在调用一种称为“ calculateStampDuty”的方法,该方法将返回要在财产上支付的印花税额。百分比计算工作正常,并返回正确的值“ 15000.0”。但是,我想只向前端用户显示该值“ 15000”,因此只想删除其后的小数和任何先前的值。如何才能做到这一点?我的代码如下: 我尝试了以下方法: 创建一个新的字符串,该字符串将double值转换为字符串,如下所示: 我已经尝试在S

  • 我调用了一个名为“CalculateStampTax”的方法,该方法将返回要支付的房产印花税金额。百分比计算工作正常,并返回正确的值“15000.0”。但是,我想向前端用户显示的值仅为“15000”,因此只想删除小数点和其后的任何前面的值。如何做到这一点?我的代码如下: 我尝试了以下方法: > 创建一个将双精度值转换为字符串的新字符串,如下所示: 我尝试在String对象上使用“value eOf

  • 我是ANTLR的新手,并使用ANTLR4(4.7.2 Jar文件)。我目前正在研究Oracle解析器。我对十进制数字有问题。我只保留了相关的部分。我的语法文件如下。 我该怎么做?我尝试过使用_input.la(-1)!='.'}?等,但对我来说并不正确。我尝试了提到的许多其他步骤(大多数解决方案适用于ANTLR3,但不适用于ANTLR4)。在lexer中有没有一种简单的方法可以做到这一点?我不想编

  • 晚上好,有经验的人可以告诉我如何制作一个像下面这样的pnel吗?我想制作一个面板,并在里面嵌入一个,这是可能的吗?如果不是,那么如何将包装在下面这样的内容中呢?下面是我想要创建一个类似于它的实际面板: 注意:我尝试了,但我认为它与图像中的不同

  • 如何避免在HTML5中输入数字的十进制值。目前,它允许用户键入十进制值。

  • 我想知道如何将舞蹈方法的十进制结果包含到灯光方法中。例如,在这个程序中,如果我输入5F,十进制结果将是95。我希望95在light方法中显示为静态int变量,以便转换为二进制数。如果你能告诉我如何将十六进制数限制在2位数以内,那也会很有帮助。感谢阅读! } }