假设我对同一个消息键有不同的值。
例如:
{
userid: 1,
email: user123@xyz.com }
{
userid: 1,
email: user456@xyz.com }
{
userid: 1,
email: user789@xyz.com }
在上述情况下,我只需要用户更新的最新值,即“user789@xyz.com”。
我的kafka流应该只给我第三个值,而不是前两个值。
您需要Kafka日志压缩
。简而言之,如果您希望您的主题只保留特定键的最后一个值,您应该设置属性log.cleanup.policy=压缩
。您可以在此处找到有关它的更多信息。
似乎您希望在进一步处理之前缓冲记录。因为在流媒体中,你有不断增长、无限的数据集,所以你永远不知道是要等待更多的记录,还是要刷新缓冲区以进行进一步处理。你能补充更多关于如何处理这些记录的细节吗?
您可以引入一个附加参数,即刷新缓冲区之前要等待的最长时间。要对此进行归档,您可以使用会话窗口或滚动窗口,或者使用与提交间隔相关联的记录缓存,或者也可以使用Kafka低级别处理器API实现它。
下面的示例代码显示了如何使用翻滚窗口对此进行存档,以在1小时的时间窗口内聚合和抑制所有用户ID信息,接受延迟10分钟的事件,然后将抑制的事件发送到下游处理器(如果使用此方法,则在新事件到来之前可能无法获得最终结果):
userInfoKStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
.aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((userId, value) -> {});
由于您没有指定特定的客户端,我将向您展示如何使用ksqlDB和新添加的函数,LATEST\u BY\u OFFSET
来实现这一点。
首先,我用源数据填充主题:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF
然后在ksqlDB模型中,首先将其作为事件流:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |user123@xyz.com |
|1 |1 |user456@xyz.com |
|1 |1 |user789@xyz.com |
现在,我们可以告诉ksqlDB获取这个事件流,并直接为我们提供最新的值(基于偏移量):
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |user789@xyz.com |
Press CTRL-C to interrupt
或者更有用的是,如ksqlDB中的物质化状态:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
该表仍然由对Kafka主题的更改驱动,但可以直接查询当前状态,无论是从现在开始(“pull query”):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
Query terminated
ksql>
或者作为状态演变的一系列变化(“推送查询”):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
[ query continues indefinitely ]
本文向大家介绍Kafka 消费者是否可以消费指定分区消息?相关面试题,主要包含被问及Kafka 消费者是否可以消费指定分区消息?时的应答技巧和注意事项,需要的朋友参考一下 Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可
我成功地为应用程序使用了logback,但无法从ch.qos.logback记录消息。我知道一个无法记录初始消息(下面的回答确认了这一点),这是有意义的。我不明白的是,为什么我还不能记录logback完成自身配置后生成的消息。 因此,没有办法告诉Logback将它自己的on-startup-log-events定向到文件附加器。 由ch.qos.logback.classic.net.smtpap
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取
新服务器密钥是否仅限于消息传递? 说明:在firebase项目设置中,我可以获得“旧”和新服务器密钥(云消息选项卡)。旧版本无法通过发送推送通知https://fcm.googleapis.com/fcm/send 因为响应说它是一个遗留服务器密钥。但在这里,它可以被限制在某些谷歌API中https://console.developers.google.com/apis. 谷歌API控制台中没有
英文原文:http://www.phpconcept.net/pclzip/news 译注: 原文中描述了最新消息的 PclZip 版本是 v2.6,而不是翻译时看到的 v2.8.2 因此可忽略本页内容,因为不够新了 PclZip 2.5 新特性 PclZip v2.5 引入了一个安全功能,以及修改压缩包内的文件名的功能。 为了实现这些功能,进行了大量代码修改,以保证对压缩包内文件列表的属性的维护