当前位置: 首页 > 知识库问答 >
问题:

提取和转换jdbc接收器连接器的kafka消息特定字段

葛宪
2023-03-14

我有一个kafka主题,它是使用Debezium mysql source connector从mysql数据库获取数据,下面是其中一条消息的格式:

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想使用jdbc接收器连接器将ts_ms,before,afterid(从对象/行)列推送到另一个数据库中,表模式(id,before(text),after(text),timestamp),对kafka来说,我不知道:

>

  • 我如何才能只提取这些字段,从消息中推送而忽略其他字段?

    如何将before、after字段转换为字符串/序列化格式?

    如何从对象提取id?(如果是insert操作,则before为空,如果是delete操作,则after为空)

    对于上面的消息,接收器目标表的末尾应该有如下所示的数据:

    id:     1004
    before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
    after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
    timestamp: 1465491411815
    
  • 共有2个答案

    王才
    2023-03-14

    您可以为您的json有效负载创建一个DTO(您从kafka主题中获得的Java对象),利用这个联机转换器可以帮助您将json转换为Java对象。[http://pojo.sodhanalibrary.com/][1]

    一旦从kafka主题接收到消息,就可以使用objectmapper转换json并将其映射到适当的DTO对象。您可以通过调用getId()、getBefore()等来使用该对象提取所需的字段,

    下面是一些参考代码,可以帮助您理解:

        @KafkaListener(topics = "test")
            public void listen(String payload)  {
    
                logger.info("Message Received from Kafka topic: {}", payload);
    
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    
                DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);
    
                    logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));
    
                    logger.info("Get Before:{}", dtoObject.getId());
    
    
    
            }
    
    司马宏茂
    2023-03-14

    您可以使用卡夫卡连接转换链,就像这个解决方案。

     类似资料:
    • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

    • 用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P

    • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

    • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

    • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka

    • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我