当前位置: 首页 > 知识库问答 >
问题:

kafka流-所有组成员处理一个全局事件

舒仲渊
2023-03-14

我的问题是:我应该如何构造我的kafka应用程序,使其能够从一个主题的单个分区中使用另一个具有多个分区的主题所发生的“全局”事件中做出反应?

如果我有以下场景:我有一个gigs主题,它每次宣布一个新的演出时都会得到一条新消息,键是艺术家的名字,值是一个带有日期、位置等的avro对象。主题有一个分区。

示例键:ABBA示例值:{“日期”:“2020-08-21t09:00:00z”,“地点”:“O2伦敦”}

我有另一个紧凑的主题,名为subscribes,其中包含订阅了artist的用户,他们希望在他们感兴趣的艺术家的演出宣布时收到通知。这里的键是一个avro键,以用户ID和艺术家姓名作为值,该值是一个avro对象,带有电子邮件和其他首选项。该主题有10个分区。

示例键:{“user”:123,“artist”:“abba”}示例值:{“email”:“123@email.com”,...}

我有很多用户,但没有那么多的工作,我想运行应用程序的10个实例,因为这是subscribes主题的分区计数。

当gig生成到gigs时,每个在subscribes主题中有消息的用户都会使用这两个主题,其中key.artist等于gigs生成到第三个主题的消息。我可以通过扫描从subscribes构建的本地存储来实现这一点。

对于这样的应用程序,人们将如何使用kafka流(或者仅仅是消费者)?文档中提到全局表是向所有成员广播事件的一种方式,但我遇到了不同的问题:

  1. 如果我用subscribes中的KTable和gigs中的GlobalTable编写这个应用程序,那么我就不能使用DSL连接这两个应用程序。用户可以在subscribes中停留一段未知的时间,因此我不能使用KStream来连接GlobalTable.
  2. 使用处理器api,当使用新消息时,我无法从全局上下文(从gigs)将全局表转发到本地上下文(在本地上下文中,我可以从subscribes)访问本地可查询存储区。
  3. 即使我可以在2中从全局上下文转发到本地上下文。我会遇到这样的问题:每当应用程序重新启动时,它会从一开始读取主题,因为globaltables不保留偏移量,所以我最终会一次又一次地向输出主题发送相同的消息。

我想到的解决方案是:使用gigs主题,并生成具有10个分区的gigs_broadcast,并向每个分区写入相同的消息。这样,每个成员都可以从每个存储区分配相同的分区,因此每当消息到达gigs时,它就在本地上下文中,并可以将其转发到另一个处理器,该处理器可以从subscribes访问可查询存储区,以便进行扫描。

有没有更好的解决办法?也许通过使用消费者+生产者而不是Kafka流?理想情况下,我知道这两个主题都有相同的键,但我不确定这是否可行:我必须将subscribes流重新键到艺术家,执行groupBy,聚合每个艺术家的用户ID列表,然后将其连接到gigs主题,以便在它们上进行平面映射,生成gigs_customers,其中键与subscribes相同,值也相同。

共有1个答案

汪晨
2023-03-14

听起来像是外键连接。您可以将这两个主题作为KTables来读取,并将艺术家作为join属性来使用。您的User/Subscription表将是您的左手边表,您使用key-extractor提取到artist作为join属性并连接到gigs表。

 类似资料:
  • 希望创建一个DynamoDB全局表来存储客户信息。我的问题是,我目前的模式是监听此表上的更改,并使用Lambda触发器发送电子邮件更新。 i、 e.您的个人资料信息已更改。如果不是你。。 我现在是否需要在每个区域中使用该Lambda?数据复制是否意味着每个区域都会触发该Lambda?

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 我将一些事件转发给Kafka并启动了我的Kafka流程序。我的程序开始处理事件并完成。一段时间后,我停止了我的Kafka流应用程序并重新开始。观察到我的Kafka流程序正在处理已经处理过的先前事件。 根据我的理解,Kafka流在内部维护每个应用程序id的输入主题本身的偏移量。但在这里重新处理已经处理的事件。 如何验证Kafka流处理的偏移量?Kafka流是如何保存这些书签的?根据什么 如果Kafk

  • 统一错误处理 文档:https://eggjs.org/zh-cn/tutorials/restful.html 自定义一个异常基类 // app / exceptions / http_exceptions.js class HttpExceptions extends Error { constructor(msg='服务器异常', code=1, httpCode=400) {

  • 问题内容: 假设我定义了一些全局Ajax事件处理程序(ajaxStart,ajaxStop和ajaxError)。通常,我对此表示满意,但对于一个请求,我想禁用ajaxError处理程序,但仍照常运行ajaxStart和ajaxStop处理程序。在jQuery的AJAX功能的文件中提到,可以设置为false,并传递到$就功能禁用所有全局AJAX事件处理程序的“全球性”的参数,但他们没有提及任何办法