当前位置: 首页 > 知识库问答 >
问题:

在合流Kafka中,关键的反序列化在消费者阶层中意味着什么?

萧心远
2023-03-14

confluent kafka文档说,Consumer类定义如下:

Class Consumer<TKey, TValue> 

上面的consumer类实现了一个高级Apache Kafka consumer(具有键和值反序列化)。

我知道TKey和TValue用于反序列化从生产者发送的密钥。例如,类似于

从生产者发送密钥将看起来像

var deliveryReport = producer.ProduceAsync(topicName, key, val);

在消费者端接收string键将显示为

using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
    consumer.Subscribe(topics);

    Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

    var cancelled = false;
    Console.CancelKeyPress += (_, e) => {
        e.Cancel = true; // prevent the process from terminating.
        cancelled = true;
    };

    while (!cancelled)
    {
        Message<Ignore, string> msg;
        if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
        {
            continue;
        }

        Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
    }
}
Consumer<Ignore, string>

并将消息初始化为

Message<Ignore, String>

在所有这些之后,我的问题是,密钥的反序列化真正意味着什么?我们为什么要这么做?另外,为什么我们需要传入一个键-值对Ignore,字符串来执行反序列化?

共有1个答案

慕高格
2023-03-14

为什么我们需要传入一个键值对Ignore,字符串来执行反序列化?

您不需要传递这些特定设置。您需要匹配制作人的设置。或者,如果您不确定,您可以为键和值提供byte array object。

如果生产者没有发送密钥,例如null,则没有任何东西可以反序列化。我假设这就是Ignore类的用途。注意,您没有提供键反序列化器类,但为值提供了

null, new StringDeserializer(Encoding.UTF8))

默认情况下,键决定您将从主题的哪个分区中获取消息。空键将在主题中平均分配。否则,生产者应用程序可以定义它们自己的分区,并在它们的逻辑决定的任何地方发送数据

 类似资料:
  • 我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!

  • 我创建了一个简单的生产者-消费者应用程序,使用自定义序列化器和反序列化器。 在我生成的Message类中添加了一个新方法之后,使用者开始在反序列化时被堆栈。我的生产者使用的是新类(有新方法),消费者使用的是旧类(没有方法)。 JSONSerializer/Deserializer可以处理这些类型的修复吗?如果我要使用JSONSerialzier,它应该只关心类的模式,对吗?

  • 问题内容: 我有一个带有和对象的简单模型。 一个问题有很多选择。 许多选择有一个问题 有两种使用Hibernate来实现的方法 实施一:所有者是选择 Question.java Choice.java 实施二:所有者方是质询 Question.java Choice.java 两种实现之间有什么区别? 问题答案: 第一个示例是正常且正确的双向一对多/多对一映射。设置为-attribute(“拥有方

  • 我在 angular4 中使用。为此,我创建了 类,该类实现了 接口并提供方法的定义。然后注册提供商进行这样的 这很好。但是我还是不明白这里的< code>multi:true是什么意思?我看过这个回答。 如果我删除了< code>multi:true选项,浏览器端就会出现错误 这是否意味着<code>HTTP_INTERCEPTORS</code>是初始化多个指令或组件的多提供者?如果是,那么还

  • 问题内容: $(‘button’).click(function () { 我的问题:尽管我打了个电话,为什么它仍然会提醒下一个号码?就像:忽略下面的代码,然后继续下一个元素 问题答案: 除了引发异常外,没有其他方法可以停止或中断循环。如果您需要这种行为,该方法是错误的工具。 提前终止可以通过以下方式完成: 一个简单的循环 一… 圈 另一个阵列的方法:,,,和测试使用谓词返回truthy值的数组元

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性