当前位置: 首页 > 知识库问答 >
问题:

Kafka Topic-过滤和分派消息

子车雅珺
2023-03-14

我们的软件解决方案为每个客户收集数据(“事件”)<一些客户(一小部分约3%)要求将这些数据输入“他们的系统”(他们需要为此服务付费)<我们需要发送这些事件的目标系统可能是:

  1. AWS S3
  2. Azure存储
  3. Splunk
  4. 数据狗
  5. 未来会有更多的目标系统...

上面的所有目标系统都有众所周知的Kafka Connect接收器连接器,因此我们的想法是使用这些连接器来导出数据。

  1. 所有客户事件都转到一个“输入”主题
  2. 自定义软件使用来自Kafka“输入”主题的消息
  3. 该软件查看消息属性,并根据其中一个属性值(称为customer\u id)决定是否应删除消息或将其发布到另一个名为'

目标主题可能是另一个集群的一部分。我知道这可以很容易地做到使用Kafka流。

请注意,我知道Kafka流中的Disperse消息线程

我的问题是-可以使用Kafka Connect和SMT吗<我正在寻找一个“托管”解决方案,因为我们的Kafka在AWS MSK中运行,所以我不需要管理Kafka Connect群集。有了Kafka Streams,我必须在EC2/ECS上安装我的软件,不是吗

共有1个答案

松英喆
2023-03-14

目标主题可能是另一个集群的一部分。我知道使用Kafka流很容易做到这一点

Kafka流只能/应该写入同一个集群。它不能保证交付给其他人。

对于将数据发送到其他集群,MirrorMaker将是一个起点。

如您所知,RegexRout可以重命名主题,但它不能从记录中提取动态字段并重命名主题-您需要为此编写自己的转换。

您还应该能够使用过滤器转换来检查/删除事件,但这只适用于顶级字段,而不适用于嵌套字段。

总的来说,我发现“每个id”有一个主题名是一个糟糕的设计,假设你可能(最终)有成千上万个id。

或者,“每个客户”管理数万到数千个集群(或者,至少使用“每个客户”的配额分割集群,尽管不确定多租户如何处理重复的主题名称)可能也很困难,但这基本上是MSK或融合云正在做的事情。

 类似资料:
  • 问题内容: 我有一个由django-tables2生成的工作表: 上面的代码返回一个包含数百个对象的表,这些对象整齐地分页,每页10个项目。当我单击表格底部的“下一步”时,分页效果很好,并且可以浏览不同的页面。但是,我注意到以下行为: 单击以显示原始未过滤表的子集 单击过滤表底部的“下一步”将显示未过滤表的第二页 再次单击将显示过滤后的表格的第二页 我希望过滤器在浏览不同页面时能够保持不变。我在这

  • 我正在尝试使用TwilioAPI,我想使用消息内容作为过滤器。所以我想提出一个要求https://api.twilio.com/2010-04-01/Accounts/AccSID/Messages.json?body=“test”,它将包括所有正文中包含单词“test”的消息,无论是入站消息还是出站消息。 在Twilio文档中,我只能找到按发送日期过滤的邮件。上述用例可以通过Twilio实现吗?

  • 我正在Drupal7中创建一个自定义模块,使用views_datasource模块提取Json格式的数据,并在前端对数据进行角过滤(来自view的结果不到50个)。我遇到的问题是当我尝试对结果进行分页时。该应用程序显示所有结果,底部有分页编号。当我输入筛选器文本输入时,结果被正确筛选,分页反应正常(收缩和扩展)。我试图做的是使我的显示结果和分页一起工作,这样每个页面的最大结果数是2。我对此非常陌生

  • 在 MySQL 中,可以使用 HAVING 关键字对分组后的数据进行过滤。 使用 HAVING 关键字的语法格式如下: HAVING <查询条件> HAVING 关键字和 WHERE 关键字都可以用来过滤数据,且 HAVING 支持 WHERE 关键字中所有的操作符和语法。 但是 WHERE 和 HAVING 关键字也存在以下几点差异: 一般情况下,WHERE 用于过滤数据行,而 HAVING 用

  • Spring Boot上有一个应用程序,它把它的jms请求放在一个队列中,然后从另一个队列中获取答案。还有一个应用程序正在处理响应队列。请求~每秒100次。实际上,问题是如何处理它们并选择我需要的?现在我使用@JmsListener读取队列中的所有消息,但是那些不适合我的应用程序的消息被绘制出来。创建一个线程并分别等待答案,在我看来,这似乎不是一个好主意,因为可以有几千个线程。如何成为?

  • 删除节点内指定的消息过滤器。 调用: web3.shh.deleteMessageFilter(id) 参数: id:String, 过滤器ID,shh.newMessageFilter()返回 返回值: Boolean: 成功时返回true,失败时返回false 示例代码: web3.shh.deleteMessageFilter('2b47fbafb3cce24570812a82e6e93c