当前位置: 首页 > 知识库问答 >
问题:

如何使用Uber Cadence设计一个无状态工作线程来只处理一次消息

云浩然
2023-03-14

请帮助我们采用Cadence:D

这是当前的设计。一些无状态工作人员从集中式队列中提取消息来处理它。工作人员中涉及复杂的业务逻辑以及Deduper功能,该功能利用单独的Redis集群作为远程分布式缓存(使用共识的强一致性)。该缓存仅存储消息ID及其状态,或者“正在进行”、“已完成”和“未启动”。显然,工作人员应该处理未完成的消息。

就我个人而言,我想重新考虑所有可能的解决方案。我想到了工作流模型,因为我对AWS SWF有着愉快的体验。由于我们所有的服务都是用go编写的,并在我们自己的数据中心上运行,我想试试优步Cadence(SWF的开源)。

我看了很多优步用户的视频,我认为第一步是从一个新工作流中的一个活动开始,然后将其分解为多个活动,或者在我们将其迁移到AWS后再进行AWS lambda。

所以我在这里列出了所有要求

  1. 避免多个工作人员对消息进行两次处理。
  2. 50k 要求/秒,因此需要可扩展的解决方案
  3. p99 上的低延迟,

目前,只有第一个要求令人头疼,因为Redis缓存是一个远程缓存集群。prod中存在一些连接性问题,我们真的希望消除它,以避免复杂性和额外的网络跳数。

问题:

  1. 所以我想知道在切换到Cadence时如何设计deduper

通过阅读文档,Cadence 在域内提供了工作流 ID 唯一性功能。也许我可以使用消息 ID 作为工作流 ID 的一部分,例如 WF-00001,以确保域内没有重复项。只要我只使用一个域,就不会有问题。然后我不知道这种方法的局限性。例如,域中允许的工作流数。我们有 50k 消息处理速率 /s (峰值)

我不确定这是否是正确的方法。欢迎更多想法。

谢谢

SWF 步进函数 优步节奏

共有1个答案

柯瀚海
2023-03-14

在高层次上,Cadence非常适合您的用例。

>

  • 重复数据消除非常简单。工作流保留最近的请求ID(或属于给定workflowID的所有请求,如果它们的数量有界)的映射,并对其执行重复检查。

    大多数踏频限制都是特定于部署且可配置的。让我们在 Slack 上讨论您的特定用例。

  •  类似资料:
    • 我是一个很新的编程人员,我正在尝试使用Javascript制作岩石剪刀,我遇到了这个问题。每当我点击岩石图像,它会产生一个0和2之间的随机数,如果这个数字是0,那就是平局,如果是1,那玩家输了,如果是2,那玩家赢了。但当我运行这段代码时,它只在数字为0时才起作用。所以当我点击按钮时,如果数字是0,它会显示“它是一个领带”,但如果不是0,它不会显示任何东西。但当我再次尝试单击时,它不起作用。我必须刷

    • null 当工作线程处理消息时,如果处理时间超过5分钟,我希望生成一条警告消息,但仍然让工作线程继续处理。 问题 我希望不断检查工作线程是否超过了5分钟的消息处理时间,如果超过了阈值时间,那么我希望记录一条错误消息,但仍然让工作线程按原样继续。 工人阶层

    • 问题内容: 我假设如果实例变量是由spring IOC管理的,并且是单例的,则设计可以被称为无状态和线程安全的,因此这种设计可以扩展到集群服务器。我的假设是否正确?概述如下? 然后将其注入: 问题答案: Spring bean不是无状态的,因为它们具有状态(字段)。从技术上讲,它们甚至不是一成不变的,因为您可以随时更改注入的字段。 但是,您可以通过使用字段和构造函数注入轻松地使Spring bea

    • 我最近在我的计算机上安装了一个本地Kafka,用于测试和开发: 3 经纪人 一个输入主题 主题和弹性搜索之间的 Kafka 连接接收器 我设法在独立模式下配置它,所以一切都是本地主机,Kafka connect是使用脚本。 我现在想做的是在分布式模式下运行我的连接器,这样Kafka消息就可以分成两个工作者。我已经启动了两个工作者(仍然是同一台机器上的所有东西),但是当我向我的Kafka主题发送消息

    • 我试图理解fork-join的窃取部分。fork-join池具有具有自己Deque的工作线程。如果工作线程自身的deque为空,则该线程从另一个工作线程中窃取。 线程如何访问其他线程的状态? 当所有者线程和窃取者线程尝试访问取消排队中的同一项目时,它不会产生同步问题吗?

    • 我正在努力寻找Android上TimerTask函数的文档。我需要使用TimerTask每隔一段时间运行一个线程,但我不知道如何执行此操作。如有任何建议或示例,将不胜感激。