当前位置: 首页 > 知识库问答 >
问题:

Kafka连接自定义转换以将无模式Json转换为Avro

拓拔安邦
2023-03-14

我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。

我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。

这是我迄今为止所尝试的:

public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
    private static final String PURPOSE = "transforming payload";
    public static final ConfigDef CONFIG_DEF = new ConfigDef();
    @Override
    public void configure(Map<String, ?> props) {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    @Override
    public R apply(R record) {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        avro_Schema updatedSchema = makeUpdatedSchema();

        return newRecord(record, updatedSchema);
    }

    private avro_Schema makeUpdatedSchema() {
        avro_Schema.Builder avro_record = avro_Schema.newBuilder()
                .setName("test")
                .setTry$(1);

        return avro_record.build();
    }

    protected Object operatingValue(R record) {
        return record.value();
    }

    protected R newRecord(R record, avro_Schema updatedSchema) {
        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
    }
}

其中avro_schema是avsc文件中指定的架构名称。

我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord()函数时,它希望updatedSchema是模式类型,但我为其提供了一个自定义的avro\u模式类型。

此外,我保存到updatedSchema中的avro_record.build()并不是真正的模式,而是转换后的记录本身。但是我不能只将记录主题、键(=null)和updatedRecord传递给newRecord函数。它分别需要模式和值。

我的问题是:

  1. 甚至可以使用KafkanConnect将json转换为avro,而不使用KStream或KSQL吗因为这两种选择都需要设置独立的服务

我很抱歉,如果这个问题已经得到了回答,我确实问了一些其他问题,但似乎没有一个能回答我的疑问。如果你需要任何其他细节,请告诉我。非常感谢。

共有1个答案

裴浩歌
2023-03-14

KafkaConnect自定义转换器只需要向传入的JSON添加模式。接收器属性format.class=io.confluent.connect.s3.format.avro.AvroFormat将负责其余部分。

如果没有模式,记录值是一个映射,如果有模式,它将成为一个结构。我不得不修改我的代码如下:

    @Override
    public R apply(R record) {
        final Map<String,?> value = requireMap(record.value(),PURPOSE);
        Schema updatedSchema = makeUpdatedSchema();

        final Struct updatedValue = new Struct(updatedSchema);


        for (Field field : updatedSchema.fields()) {

            updatedValue.put(field.name(), value.get(field.name()));
        }


        return newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema() {
        final SchemaBuilder builder = SchemaBuilder.struct()
                .name("json_schema")
                .field("name",Schema.STRING_SCHEMA)
                .field("try",Schema.INT64_SCHEMA);

        return builder.build();
    }

感谢@OneCricketeer澄清了我的疑问!

 类似资料:
  • 我试图用以下格式转换一个JSON: 我已经计算了JSON,现在我需要使用evaluatejsonpath输出中的属性来获得以下格式: 它们是内置的处理器来完成此转换吗,还是需要在executionscript处理器中开发一个Python脚本?

  • 有一个网站这样做,但我想要一个图书馆或CLI。 谢了!

  • 我对大摇大摆的文档很陌生。我们有一个现有的项目正在为RESTFul Web服务开发进度语言。不同的基于资源的URL以Application/JSON格式使用和生成。下面给出了一个资源url的输入和输出json格式: 如何将上述请求和响应json格式转换为swagger 2.0 json格式? 谢了!

  • 问题内容: 是否有一个python库,用于将JSON模式转换为python类定义,类似于jsonschema2pojo- https : //github.com/joelittlejohn/jsonschema2pojo- 适用于Java? 问题答案: 到目前为止,我能够找到的最接近的东西是warlock,它可以宣传此工作流程: 建立架构 建立模型 使用模型创建对象 但是,这并不是那么容易。术士

  • 嗨,我在Python中将utf-8 json转换成unicode escape json时遇到了一些问题 我知道如何将utf-8.txt转换为unicodeescape.txt 但是,我在Python中使用json模块时遇到了上面应用的问题,如下所示 它保存得很好,但是,当涉及到json中的双引号(“)时,它会自动添加双反斜杠(\\),因此unicode-esc.json文件在调用python脚本

  • 对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?