当前位置: 首页 > 知识库问答 >
问题:

Kafka将流作为表补丁日志,而不是完整发布

公良理
2023-03-14

期望的功能:对于给定的密钥key123,许多服务并行运行,并将其结果报告给单个位置,一旦为key123收集了所有结果,就会将其传递给新的下游消费者。

最初的想法:使用AWS DynamoDB保存给定条目的所有结果。每次结果准备就绪时,微服务都会对key123上的数据库执行PATCH操作。输出流检查每个UPDATE以查看条目是否完整,如果是,则将其转发到下游。

新想法:使用Kafka Streams和KSQL实现相同的目标。所有服务都将其输出写入结果主题,该主题形成一个更改日志Kstream,我们可以通过KSQL查询完整的条目。比如:

CREATE STREAM competed_results FROM results_stream SELECT * WHERE (all results != NULL). 

我不确定该怎么做的部分是流上的补丁操作。要让输出流显示key123的所有消息的累积,而不是仅显示最近的消息?

KSQL用户,这有意义吗?我是否接近于一个以前有人做过的解决方案?

共有1个答案

房时铭
2023-03-14

如果您可以使用密钥集为同一主题生成所有事件,那么您可以使用ksqlDB中的聚合来收集特定密钥的所有事件,例如:

CREATE STREAM source (
    KEY INT KEY,  -- example key to group by
    EVENT STRING  -- example event to collect
  ) WITH (
   kafka_topic='source', -- or whatever your source topic is called.
   value_format='json' -- or whatever value format you need.
);

CREATE TABLE agg AS
  SELECT 
    key, 
    COLLECT_LIST(event) as events
  FROM source
  GROUP BY key;

默认情况下,这将创建一个名为AGG的变更日志主题。当收到源主题上特定键的新事件时,ksqlDB将向AGG主题生成消息,该键设置为key,该值包含该键看到的所有事件的列表。

然后,您可以将此变更日志作为流导入:

CREATE STREAM agg_stream (
   KEY INT KEY,
   EVENTS ARRAY<STRING> 
) WITH (
   kafka_topic='AGG',
   value_format='json'
);

然后,您可以应用一些标准来过滤流,只包括最终结果:

STREAM competed_results AS 
  SELECT 
    * 
  FROM agg_stream 
  WHERE ARRAY_LEN(EVENTS) = 5; -- example 'complete' criteria.

您甚至可能希望使用用户定义的函数来定义您的完整标准:

STREAM competed_results AS 
  SELECT 
    * 
  FROM agg_stream 
  WHERE IS_COMPLETE(EVENTS);

 类似资料:
  • 补丁是一个文本文件,其内容类似于,但与代码一样,它也有关于提交的元数据; 例如提交ID,日期,提交消息等。我们可以从提交创建一个补丁,而其他人可以将它们应用到他们的存储库。 假设我们在项目实现了一个函数。并将编写的代码的路径并发送给其他开发人员。 然后,其他开发人员可以将接收的补丁应用到自己的代码中。 我们使用命令创建最新提交的修补程序。 如果要为特定提交创建修补程序,请在命令后面指定 。 上述命

  • Git 中的一些命令是以引入的变更即提交这样的概念为中心的,这样一系列的提交,就是一系列的补丁。 这些命令以这样的方式来管理你的分支。 git cherry-pick git cherry-pick 命令用来获得在单个提交中引入的变更,然后尝试将作为一个新的提交引入到你当前分支上。 从一个分支单独一个或者两个提交而不是合并整个分支的所有变更是非常有用的。 在 变基与拣选工作流 一节中描述和演示了

  • 问题内容: 我有两个档案 秒杀 test_spike.py: 当我运行时,第一个测试用例将通过,但是第二个将失败。而我切换到use ,则两个都失败了。 我不明白这是怎么发生的?这些情况本应全部通过。 问题答案: 对于 test_foo, 您没有正确使用补丁。您应该像这样使用它: 这给了我: 现在,第二个示例不起作用,因为您导入了bar函数(获取对其的引用),然后尝试对其进行模拟。模拟某些内容时,您

  • 问题内容: 我遇到了一个似乎很著名的问题:我的updatepanel触发了完整的回发而不是异步的回发。正常的解决方案是给您动态添加的所有控件一个ID,这已经完成了,但是我仍然得到完整的回发而不是异步回发… 这是代码: HTML: 有趣的是背后的C#代码(方法PlayerItems_ItemDataBound),如下所示: 因此,我实际上向菜单添加了AsyncPostBackTrigger,因此应该

  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 问题内容: 我想嘲笑。但是,当我运行以下代码时,该模拟被完全忽略了,我不确定为什么 测试代码: 模块代码: 问题答案: 您没有在正确的位置打补丁。您在定义的地方打补丁: 您需要修补导入的位置,即在编写此行的“模块代码”中: 即,它应该看起来像: 要获得快速指南,请阅读文档中的部分:修补位置。