我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。
我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。
这是我迄今为止所尝试的:
public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
private static final String PURPOSE = "transforming payload";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public R apply(R record) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "1");
properties.setProperty("retries", "10");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
avro_Schema updatedSchema = makeUpdatedSchema();
return newRecord(record, updatedSchema);
}
private avro_Schema makeUpdatedSchema() {
avro_Schema.Builder avro_record = avro_Schema.newBuilder()
.setName("test")
.setTry$(1);
return avro_record.build();
}
protected Object operatingValue(R record) {
return record.value();
}
protected R newRecord(R record, avro_Schema updatedSchema) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
}
}
其中avro_schema是avsc文件中指定的架构名称。
我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord()函数时,它希望updatedSchema是模式类型,但我为其提供了一个自定义的avro\u模式类型。
此外,我保存到updatedSchema中的avro_record.build()并不是真正的模式,而是转换后的记录本身。但是我不能只将记录主题、键(=null)和updatedRecord传递给newRecord函数。它分别需要模式和值。
我的问题是:
我很抱歉,如果这个问题已经得到了回答,我确实问了一些其他问题,但似乎没有一个能回答我的疑问。如果你需要任何其他细节,请告诉我。非常感谢。
KafkaConnect自定义转换器只需要向传入的JSON添加模式。接收器属性format.class=io.confluent.connect.s3.format.avro.AvroFormat将负责其余部分。
如果没有模式,记录值是一个映射,如果有模式,它将成为一个结构。我不得不修改我的代码如下:
@Override
public R apply(R record) {
final Map<String,?> value = requireMap(record.value(),PURPOSE);
Schema updatedSchema = makeUpdatedSchema();
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
updatedValue.put(field.name(), value.get(field.name()));
}
return newRecord(record, updatedSchema, updatedValue);
}
private Schema makeUpdatedSchema() {
final SchemaBuilder builder = SchemaBuilder.struct()
.name("json_schema")
.field("name",Schema.STRING_SCHEMA)
.field("try",Schema.INT64_SCHEMA);
return builder.build();
}
感谢@OneCricketeer澄清了我的疑问!
我试图用以下格式转换一个JSON: 我已经计算了JSON,现在我需要使用evaluatejsonpath输出中的属性来获得以下格式: 它们是内置的处理器来完成此转换吗,还是需要在executionscript处理器中开发一个Python脚本?
有一个网站这样做,但我想要一个图书馆或CLI。 谢了!
我对大摇大摆的文档很陌生。我们有一个现有的项目正在为RESTFul Web服务开发进度语言。不同的基于资源的URL以Application/JSON格式使用和生成。下面给出了一个资源url的输入和输出json格式: 如何将上述请求和响应json格式转换为swagger 2.0 json格式? 谢了!
问题内容: 是否有一个python库,用于将JSON模式转换为python类定义,类似于jsonschema2pojo- https : //github.com/joelittlejohn/jsonschema2pojo- 适用于Java? 问题答案: 到目前为止,我能够找到的最接近的东西是warlock,它可以宣传此工作流程: 建立架构 建立模型 使用模型创建对象 但是,这并不是那么容易。术士
嗨,我在Python中将utf-8 json转换成unicode escape json时遇到了一些问题 我知道如何将utf-8.txt转换为unicodeescape.txt 但是,我在Python中使用json模块时遇到了上面应用的问题,如下所示 它保存得很好,但是,当涉及到json中的双引号(“)时,它会自动添加双反斜杠(\\),因此unicode-esc.json文件在调用python脚本
对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?