当前位置: 首页 > 知识库问答 >
问题:

是否可以从kafka消息中获取消息密钥的最新值

霍书
2023-03-14

假设我对同一个消息键有不同的值。

例如:

{
userid: 1,
email: user123@xyz.com }

{
userid: 1,
email: user456@xyz.com }

{
userid: 1,
email: user789@xyz.com }

在上述情况下,我只需要用户更新的最新值,即“user789@xyz.com”。

我的kafka流应该只给我第三个值,而不是前两个值。

共有3个答案

鲜于裕
2023-03-14

您需要Kafka日志压缩。简而言之,如果您希望您的主题只保留特定键的最后一个值,您应该设置属性log.cleanup.policy=压缩。您可以在此处找到有关它的更多信息。

方鸿振
2023-03-14

似乎您希望在进一步处理之前缓冲记录。因为在流媒体中,你有不断增长、无限的数据集,所以你永远不知道是要等待更多的记录,还是要刷新缓冲区以进行进一步处理。你能补充更多关于如何处理这些记录的细节吗?

您可以引入一个附加参数,即刷新缓冲区之前要等待的最长时间。要对此进行归档,您可以使用会话窗口或滚动窗口,或者使用与提交间隔相关联的记录缓存,或者也可以使用Kafka低级别处理器API实现它。

下面的示例代码显示了如何使用翻滚窗口对此进行存档,以在1小时的时间窗口内聚合和抑制所有用户ID信息,接受延迟10分钟的事件,然后将抑制的事件发送到下游处理器(如果使用此方法,则在新事件到来之前可能无法获得最终结果):

userInfoKStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .foreach((userId, value) -> {});
何海
2023-03-14

由于您没有指定特定的客户端,我将向您展示如何使用ksqlDB和新添加的函数,LATEST\u BY\u OFFSET来实现这一点。

首先,我用源数据填充主题:

kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF

然后在ksqlDB模型中,首先将其作为事件流:

ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY   |USERID   |EMAIL            |
+---------+---------+-----------------+
|1        |1        |user123@xyz.com  |
|1        |1        |user456@xyz.com  |
|1        |1        |user789@xyz.com  |

现在,我们可以告诉ksqlDB获取这个事件流,并直接为我们提供最新的值(基于偏移量):

ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID              |KSQL_COL_1          |
+--------------------+--------------------+
|1                   |user789@xyz.com     |

Press CTRL-C to interrupt

或者更有用的是,如ksqlDB中的物质化状态:

CREATE TABLE USER_LATEST_STATE AS 
    SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
      FROM USER_UPDATES 
     GROUP BY USERID 
     EMIT CHANGES;

该表仍然由对Kafka主题的更改驱动,但可以直接查询当前状态,无论是从现在开始(“pull query”):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |
Query terminated
ksql>

或者作为状态演变的一系列变化(“推送查询”):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |

[ query continues indefinitely ]
 类似资料:
  • 本文向大家介绍Kafka 消费者是否可以消费指定分区消息?相关面试题,主要包含被问及Kafka 消费者是否可以消费指定分区消息?时的应答技巧和注意事项,需要的朋友参考一下 Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可

  • 我成功地为应用程序使用了logback,但无法从ch.qos.logback记录消息。我知道一个无法记录初始消息(下面的回答确认了这一点),这是有意义的。我不明白的是,为什么我还不能记录logback完成自身配置后生成的消息。 因此,没有办法告诉Logback将它自己的on-startup-log-events定向到文件附加器。 由ch.qos.logback.classic.net.smtpap

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 新服务器密钥是否仅限于消息传递? 说明:在firebase项目设置中,我可以获得“旧”和新服务器密钥(云消息选项卡)。旧版本无法通过发送推送通知https://fcm.googleapis.com/fcm/send 因为响应说它是一个遗留服务器密钥。但在这里,它可以被限制在某些谷歌API中https://console.developers.google.com/apis. 谷歌API控制台中没有

  • 英文原文:http://www.phpconcept.net/pclzip/news 译注: 原文中描述了最新消息的 PclZip 版本是 v2.6,而不是翻译时看到的 v2.8.2 因此可忽略本页内容,因为不够新了 PclZip 2.5 新特性 PclZip v2.5 引入了一个安全功能,以及修改压缩包内的文件名的功能。 为了实现这些功能,进行了大量代码修改,以保证对压缩包内文件列表的属性的维护