当前位置: 首页 > 知识库问答 >
问题:

如何在KCL中重新排序异步发送的Kinesis事件

南门魁
2023-03-14

除了切换到使用阻塞同步Kinesis客户端之外,还有其他解决方案可以有效地对流事件进行排序吗?

共有1个答案

太叔涵亮
2023-03-14

如果排序很重要,请不要使用异步客户端。

异步客户端只是使用隐藏的线程池来进行完全相同的调用--因为它是多线程的,所以您不能保证这些线程的执行顺序,因此,您无法控制Kinesis接收这些记录的顺序。

现在,如果延迟真的是您的生产者的一个问题:

 类似资料:
  • 我们正在构建基于Kinesis/DynamoDB流的服务,我们对检查点的行为有以下问题。 我们有一个worker,它以以下配置开始,即InitialPositionInStream(InitialPositionInStream.LATEST),并且KCL应用程序的名称始终相同。 通过关闭和再次打开工作线程,我们观察到,它不会从流的末尾开始消耗,因为我们有一个滞后指标,我们看到,当工作线程打开时,

  • 每个Kinesis应用程序都必须包括这三个组件: > iRecordProcessor接口

  • 寻找有关以下用例的建议或解决方案 应用程序接收按功能键(如员工id)标识的更改时间排序的消息。功能键可以有多条消息 每条消息都会触发一个工作流。如果员工有待定工作流,则希望将新消息排队,直到待定工作流完成 是否有任何方法可以在节奏中对消息重新排序,以将它们作为由消息中的功能键标识的组进行处理?

  • 我试图找出如何在事务上下文中正确处理原子级的对Kafka的多次写入。在此场景中,事务不是由kafka消息侦听器启动的,而是通过@Transactional注释以编程方式启动的,请参见下面的代码段。 我使用的是spring boot 2.4.2和spring kafka 2.6.5。 KafkaProducer文档指出,在事务上下文中,不需要调用。get(),因为它最终会在尝试提交事务时引发异常。此

  • 问题内容: 我有一张像下面的桌子, 我想使用“名称”列按字母顺序重新排序,并使用此新顺序重置ID(自动递增),以得到以下结果 问题 :如何使用MYSQL执行此操作? 问题答案: 请问您为什么要这么做? 如果有人修改了任何名称值或插入了新行,则会使您的订购方案混乱。试图以表的其他位置(名称列)已经可用的PK顺序存储一些含义似乎是多余的,因此是个坏主意。 更好的解决方案是不用担心ID列的值,而在应用程

  • 问题内容: 如果我有列表,如何以任意方式重新排序商品? 编辑:我不想洗牌。我想以预定义的方式对它们进行重新排序。(例如,我知道旧列表中的第3个元素应成为新列表中的第一个元素) 问题答案: 你可以这样