当前位置: 首页 > 知识库问答 >
问题:

如何配置Kafka密钥序列化器,对于有时密钥是长的,有时密钥是字符串的情况?

章哲茂
2023-03-14

我有kafka producer配置,在此之前,我将密钥作为字符串类型发送,并配置了密钥序列化器,如下所示,

configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());

但是我们有一个要求,一些时间键可能也很长,所以在不创建另一个生产者配置的情况下,我们可以像ObjectSerializer那样配置上面的东西吗?有这样的礼物吗?

我的spring-kafka版本是2.1,所以我不能使用DelegatingSerializer,也不能升级项目中的版本。

更新1:

我创建了像下面这样的自定义序列化器,并将其配置为我的密钥序列化器,但是在发布了带有一些长值的消息作为密钥,并且当我看到密钥包含一些垃圾符号时,如果有任何错误,可以更正下面的代码吗。

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.util.Map;

public class LongStringSerializer implements Serializer<Object> {

    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        if (data == null)
            return null;

        if (data instanceof Long) {
            return serialize(Long.parseLong(data.toString()));
        }

        if (data instanceof String) {
            return serializeStringData((String) data);
        }
        return new byte[0];
    }

    private byte[] serializeStringData(String data) {
        try {
            return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    private byte[] serialize(Long data) {
        if (data == null)
            return null;

        return new byte[] {
                (byte) (data >>> 56),
                (byte) (data >>> 48),
                (byte) (data >>> 40),
                (byte) (data >>> 32),
                (byte) (data >>> 24),
                (byte) (data >>> 16),
                (byte) (data >>> 8),
                data.byteValue()
        };
    }

    @Override
    public void close() {
        // nothing to do
    }
}

共有1个答案

公西季
2023-03-14

请参阅委派序列化程序。

版本2.3引入了DelegatingSerializer和DelegatingDeserializer,它们允许生成和使用具有不同键和/或值类型的记录。生产者必须将头DelegatingSerializer.Serialization_Selector设置为用于选择要使用的序列化程序的选择器值;如果未找到匹配项,则引发IllegalStateException。

它需要一个头来告诉它使用哪个序列化器,但是您可以编写一个简单的版本,只查看键类型。

(我想我将增强delegatingserializer来实现这一点)。

另一种选择是使用不同的生产者为每一个。

在2.5版中,您可以覆盖Kafkatemplate上的生产者工厂属性:

java prettyprint-override">    /**
     * Create an instance using the supplied producer factory and properties, with
     * autoFlush false. If the configOverrides is not null or empty, a new
     * {@link DefaultKafkaProducerFactory} will be created with merged producer properties
     * with the overrides being applied after the supplied factory's properties.
     * @param producerFactory the producer factory.
     * @param configOverrides producer configuration properties to override.
     * @since 2.5
     */
    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
 类似资料:
  • 我使用用户提供的32字节秘密密钥使用HMAC-256对一些数据进行签名。我还希望我的应用程序使用AES-192加密数据。我应该向用户要求另一个秘密密钥(这次是16字节大小),还是有一种安全的方法从另一个32字节密钥中导出16字节的强秘密密钥?第二种方法会使应用程序配置更容易一点。有什么指导方针或提示吗?或者这种方法完全是胡说八道?

  • 问题内容: 我有一个主键为varchar(255)的表。在某些情况下,255个字符不够用。我尝试将字段更改为文本,但是出现以下错误: 我怎样才能解决这个问题? 编辑:我还应该指出,该表具有包含多个列的复合主键。 问题答案: 发生错误是因为MySQL只能索引BLOB或列的前N个字符。所以错误主要发生时,有一个领域/列类型或BLOB或那些属于或类型,如,,,,,和您尝试使一个主键或索引。无论长度是完整

  • 那是什么绳子?它看起来像ASCII,但我缺乏关于编码的深刻知识。我需要任何类型的转换/解码吗? 我写了一个小程序来打开一个文件并解密它。但是PyCrypto抛出了一个错误,我花了5个小时反复试验,没有任何进展: 所以我两个都试了: null 问候AFX

  • 如何在Azure密钥库中设置秘密,而不使用PowerShell。我们正在使用Azure Key Vault来安全地存储连接字符串和一些其他应用程序秘密。我们可以使用PowerShell脚本添加秘密,但我想知道是否有其他方法可以在Azure KeyVault中添加密钥,最好是使用API。我们实际上需要提供一个管理工具,应用程序管理员可以使用该工具在密钥库中添加/修改机密。

  • 这是我的密码 抱歉,如果我的代码一团糟。

  • 我用过这个命令 生成密钥库。它工作正常,但从我读到的内容来看,这个命令还应该提示您输入密钥密码(而不是存储密码)?我从来没有收到过这样的提示。我能跑 查看密钥库的内容。钥匙似乎就在那里。。。正确的别名在那里。在哪里获取/设置特定别名的密码? 我有一个key.properties在Android目录 在build.gradle我有: 当我试图生成一个发布版本时,我得到了 我想它可能与keyPassw