当前位置: 首页 > 知识库问答 >
问题:

Hortonworks模式注册表Nifi Java:反序列化Nifi记录

鲜于温书
2023-03-14

我正在尝试使用Hortonworks Schema注册表反序列化一些由Nifi序列化的Kafka消息

  • 在Nifi端用作RecordWriter的处理器:AvroRecordSetWriter

我能够在其他Nifi kafka消费者中反序列化这些消息。但是我正在尝试使用Kafka代码从我的Flink应用程序中反序列化它们。

我的Flink应用程序的Kafka反序列化程序处理程序中有以下内容:

final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();

Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);

这是用于反序列化消息的HWXSchemaRealstryCode:

import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;

public class HWXSchemaRegistry {

    private SchemaRegistryClient client;
    private Map<String,Object> config;
    private AvroSnapshotDeserializer deserializer;
    private static HWXSchemaRegistry hwxSRInstance = null;

    public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
        if(hwxSRInstance == null)
            hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
        return hwxSRInstance;
    }

    public Object deserialize(byte[] message) throws IOException {

        Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
        return o;
   }

    private static Map<String,Object> properties2Map(Properties config) {
        Enumeration<Object> keys = config.keys();
        Map<String, Object> configMap = new HashMap<String,Object>();
        while (keys.hasMoreElements()) {
            Object key = (Object) keys.nextElement();
            configMap.put(key.toString(), config.get(key));
        }
        return configMap;
     }

    private HWXSchemaRegistry(Properties schemaRegistryConfig) {
        _log.debug("Init SchemaRegistry Client");
        this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
        this.client = new SchemaRegistryClient(this.config);

        this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
        this.deserializer.init(this.config);
     }
}

但我得到了一个404 HTTP错误代码(找不到模式)。我认为这是因为Nifi配置和HWX Schema Registry客户端实现之间的“协议”不兼容,所以客户端正在查找的模式标识符字节在服务器上不存在,或者类似的情况。

有人能帮忙吗?

非常感谢。

原因:javax。ws。rs.NotFoundException:在org上找不到HTTP 404。玻璃鱼。运动衫客户球衣。org上的convertToException(JerseyInvocation.java:1069)。玻璃鱼。运动衫客户球衣。在org上翻译(JerseyInvocation.java:866)。玻璃鱼。运动衫客户球衣。lambda$invoke$1(JerseyInvocation.java:750)位于org。玻璃鱼。运动衫内部的错误。org上的进程(Errors.java:292)。玻璃鱼。运动衫内部的错误。org上的进程(Errors.java:274)。玻璃鱼。运动衫内部的错误。org上的进程(Errors.java:205)。玻璃鱼。运动衫过程内部的请求范围。org上的runInScope(RequestScope.java:390)。玻璃鱼。运动衫客户球衣。在org上调用(JerseyInvocation.java:748)。玻璃鱼。运动衫客户球衣$Builder。方法(JerseyInvocation.java:404)。玻璃鱼。运动衫客户球衣$Builder。在com上获取(JerseyInvocation.java:300)。霍顿工厂。登记处。阴谋论。客户SchemaRegistrCyclient 14美元。在com上运行(SchemaRegistryClient.java:1054)。霍顿工厂。登记处。阴谋论。客户SchemaRegistrCyclient 14美元。在java上运行(SchemaRegistryClient.java:1051)。安全访问控制器。javax上的doPrivileged(本机方法)。安全啊。主题doAs(Subject.java:360)在com上。霍顿工厂。登记处。阴谋论。客户SchemaRegistryClient。getEntities(SchemaRegistryClient.java:1051)位于com。霍顿工厂。登记处。阴谋论。客户SchemaRegistryClient。com上的getAllVersions(SchemaRegistryClient.java:872)。霍顿工厂。登记处。阴谋论。客户SchemaRegistryClient。HWXSchemaRegistry上的getAllVersions(SchemaRegistryClient.java:676)。(HWXSchemaRegistry.java:56)在HWXSchemaRegistry。SchemaService上的getInstance(HWXSchemaRegistry.java:26)。在SchemaService上反序列化(SchemaService.java:70)。在org上反序列化(SchemaService.java:26)。阿帕奇。Flink。流动。连接器。Kafka。内部。Kafka德雷珀。在org上反序列化(KafkaDeserializationSchemaWrapper.java:45)。阿帕奇。Flink。流动。连接器。Kafka。内部的Kafka菲彻。runFetchLoop(KafkaFetcher.java:140)位于org。阿帕奇。Flink。流动。连接器。Kafka。弗林Kafka萨。在org上运行(FlinkKafkaConsumerBase.java:712)。阿帕奇。Flink。流动。应用程序编程接口。接线员。StreamSource。在org上运行(StreamSource.java:93)。阿帕奇。Flink。流动。应用程序编程接口。接线员。StreamSource。在org上运行(StreamSource.java:57)。阿帕奇。Flink。流动。运行时。任务。SourceStreamTask。在org上运行(SourceStreamTask.java:97)。阿帕奇。Flink。流动。运行时。任务。简化任务。在org上调用(StreamTask.java:302)。阿帕奇。Flink。运行时。任务经理。任务在java上运行(Task.java:711)。朗。丝线。运行(Thread.java:745)

共有1个答案

夏学名
2023-03-14

我找到了解决办法。因为我没法让它工作。我使用字节数组的第一个字节对schema registry进行多次调用,然后使用avro模式对字节数组的其余部分进行反序列化。

>

import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;

try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
    try {
        Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
        Integer schemaVersion =  ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();


        SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
        String schemaName = schemaInfo.getSchemaMetadata().getName();

        SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
                new SchemaVersionKey(schemaName, schemaVersion));   


        String avroSchema = schemaVersionInfo.getSchemaText();
        byte[] message= Arrays.copyOfRange(message, 13, message.length);
        // Deserialize [...]
    } 
    catch (Exception e) 
    {
        throw new IOException(e.getMessage());
    }
}

我还认为,在调用hwxSRInstance之前,可能必须删除第一个字节。反序列化程序。在我的问题代码中,反序列化,因为这个字节似乎是Nifi处理器之间通信的Nifi特定字节,但它不起作用。

下一步是使用模式文本构建缓存,以避免多次调用模式注册表API。

新信息:我将扩展我的答案,以包括avro反序列化部分,因为这对我来说是一些故障排除,我必须检查Nifi avro阅读器源代码,以找出这一部分(我在尝试使用基本avro反序列化代码时遇到无效的avro数据异常):

import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {

    InputStream in = new SeekableByteArrayInput(message);
    Schema schema = new Schema.Parser().parse(schemaText);
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in,  null);
    GenericRecord genericRecord = null;
    genericRecord = datumReader.read(genericRecord, decoder);
    in.close();

    return genericRecord;
}

如果要将GenericRecord转换为map,请注意字符串值不是字符串对象,需要强制转换字符串类型的键和值:

private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
    Map<String, Object> map = new HashMap<>();
    record.getSchema().getFields().forEach(field -> 
        map.put(String.valueOf(field.name()), record.get(field.name())));

    // Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
    if(map.get("value").getClass() ==  org.apache.avro.util.Utf8.class)
        map.put("value", String.valueOf(map.get("value")));

    return map;
}
 类似资料:
  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。

  • 我是flink和kafka的新手。我正在尝试使用合流模式注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和kafka。此外,在运行代码之前已经创建了“测试”主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,该代码执行以下操作: 运行flink可执行jar的

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题

  • 我们希望序列化Java类的模式,以便任何字段或类上存在的所有注释也序列化到模式中。 我没有找到这样做的工具。 Avro不处理非字符串映射键,FasterXML不处理循环引用。它们都不会将注释序列化到模式中。 是否有任何Java JSON(反)序列化程序可以做到这一点?

  • 我使用来自Confluent的Kafka Connect来使用Kafka流并以拼花格式写入HDFS。我正在1个节点中使用架构注册表服务,它运行良好。现在我想将模式注册表分发到集群模式以处理故障转移。关于如何实现这一点的任何链接或片段都将非常有用。