当前位置: 首页 > 知识库问答 >
问题:

在kafka消息有效负载中添加时间戳

蔺劲
2023-03-14

有没有办法在Kafka消息有效载荷中添加时间戳标头?我想检查消息是何时在消费者端创建的,并基于此应用自定义逻辑。

编辑:

我试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,这样我就可以在特定的时间段内消费消息。现在Kafka只确保消息将按照它们被放入队列的顺序传递。但是在我的例子中,先前生成的记录可能在某个延迟之后到达(因此在时间T1生成的消息可能比在稍后时间T2生成的具有偏移0的消息具有更高的偏移1)。由于这个原因,它们在消费者端将不会按照我期望的顺序排列。所以我基本上是在寻找一种有序消费它们的方法。

当前的Kafka 0.8版本不提供在生产者端附加除“消息密钥”之外的任何东西的方法,在这里发现了一个类似的主题,建议在消息有效载荷中对其进行编码。但我做了很多搜索,但找不到可能的方法。

此外,我不知道这种方法是否对Kafka的整体性能有任何影响,因为它在内部管理消息偏移,而且到目前为止,从该页面可以看到没有这样的API

真的很感激任何线索,如果这完全是正确的思考方式,或者有任何可能的方法,我都准备尝试一下

共有3个答案

祝宾白
2023-03-14

这看起来将帮助您实现目标。它允许您轻松定义和编写消息标头,隐藏(反)序列化负担。您唯一需要提供的是您通过线路发送的实际对象的(反)序列化器。这种实现实际上尽可能地延迟了有效负载对象的反序列化过程,这意味着您可以(以一种非常高效和透明的方式)反序列化标头,检查时间戳,并且只有在您确定对象对您有用时才反序列化有效负载(重位)。

宰坚
2023-03-14

您可以创建一个类,其中包含您的分区信息和创建此消息时的时间戳,然后将其用作Kafka消息的键。然后,您可以使用包装器Serde将此类转换为字节数组并返回,因为Kafka只能理解字节。然后,当您在消费者端以字节包的形式收到消息时,您可以反序列化它并检索时间戳,然后将其导入您的逻辑。

例如:

public class KafkaKey implements Serializable {
    private long mTimeStampInSeconds;
    /* This contains other partitioning data that will be used by the
    appropriate partitioner in Kafka. */
    private PartitionData mPartitionData;

    public KafkaKey(long timeStamp, ...) {
        /* Initialize key */
        mTimeStampInSeconds = timestamp;
    }

    /* Simple getter for timestamp */
    public long getTimeStampInSeconds() {
        return mTimeStampInSeconds;
    }

    public static byte[] toBytes(KafkaKey kafkaKey) {
        /* Some serialization logic. */
    }

    public static byte[] toBytes(byte[] kafkaKey) throws Exception {
        /* Some deserialization logic. */
    }
}

/* Producer End */

KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));

/* Consumer End */
MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());

long timestamp = kafkaKey.getTimeStampInSeconds();
/*
 * And happily ever after */

这将比使特定分区与时间间隔相对应更灵活。否则,您将不得不继续为不同的时间范围添加分区,并保持一个单独的、同步的表格,显示哪个分区对应于哪个时间范围,这可能会很快变得笨拙。

金旺
2023-03-14

如果您想在特定时间段内使用消息,那么我可以为您提供一个解决方案,但是从该时间段开始以有序的方式使用消息是很困难的。我也在寻找同样的解决方案。查看下面的链接

Kafka Qqueue中的消息排序

获取特定时间数据的解决方案

对于时间 T1,T2,...TN ,其中 T 是时间范围;将主题划分为 N 个分区数。现在使用分区程序类生成消息,以便应使用消息生成时间来决定应将此消息使用哪个分区。

同样,在使用时,请订阅要使用的时间范围的确切分区。

 类似资料:
  • 我可以在Mule Esb中看到两个不同的对象-消息和有效负载。但我无法理解两者的实际特征。有人能帮我理解一下吗?。

  • 我正在尝试使用亚马逊SNS控制台中的发布endpoint将推送通知(PN)从我的应用服务器发送到android设备,该消息和消息结构为json,工作正常。 但是,当我试图实现相同的Java它的设备没有收到通知。 控制台上的响应 发布列表请求:{ target arn:arn:AWS:SNS:AP-south-1:818862955266:endpoint/GCM/Test app/a1ec 811

  • 我正在使用apache camel(Fuse 2.10.x)和soap over http和soap over JMS。JMS消息由对象消息转换为字节消息格式,这就造成了消息读取的问题。 我正在JBoss5.0GA环境中使用用于websphere MQ的JNDI连接。 我们遇到了IBM属性的另一个问题,通过删除属性解决了这个问题。我们还有camel header属性来设置消息

  • 我使用spring kafka 2.1.7来使用JSON消息,我想处理无法正确反序列化的消息<为了覆盖在同一条消息上循环的默认行为,我扩展了JsonDeserializer来覆盖反序列化方法。 这是我的消费者及其配置: 最后,我实现了自己的错误处理程序,以便将错误数据发送到其他主题。 这是当我使用错误消息时发生的情况: CustomKafkaJsonDeserializer尝试反序列化消息并捕获异

  • 我试图理解如何在Kafka源代码的水印策略中使用withTimestampAssigner()。我需要使用的“时间”在消息负载内。 为此,我有以下代码: 其中EventDeserializationSchema()是: 和事件: 我想了解的是如何为withTimeStampAssigner()提供时间: 变量应该是Event.time但从flink页面我不太明白。 我一直在寻找 这让我有点困惑,因

  • 在定义消息有效负载时的Firebase云消息文档中: 通过使用数据和/或通知键创建对象,可以指定一种或两种消息类型。 文档给出了组合消息的示例: 另请参阅后台应用程序中处理通知消息的文档: 这包括同时包含通知和数据有效负载的消息(以及从通知控制台发送的所有消息)。在这些情况下,通知会发送到设备的系统托盘,数据有效负载会在启动器活动的目的之外发送。 我用这个有效载荷发送通知: 但是始终为空: 我做错