从文档中可以清楚地看出,当KCL应用程序以TRIM_HORIZON作为迭代器类型启动时,记录是从流的开头读取的。文档还提到,应用程序的状态通过使用检查点在DynamoDB表中维护。
但是,我没有找到任何参考,说明KCL应用程序如何使用此DynamoDB表信息。
具体来说,我的问题如下-我有一个保留期为168小时的流,这是大量的数据。假设我的KCL(从TRIM_HORIZON上的迭代器开始)与传入的数据同步,并在流的末尾处理记录,并定期进行检查。现在,如果我重新启动我的KCL,它会从流的开头开始读取数据(168小时之前),但仍然使用DynamoDB表查看检查点并跳到最新记录,还是根本不使用检查点信息并且从开始读取流?
在后一种情况下,不需要重新处理大量数据。
我应该手动使用DynamoDB表中的序列号来获取分片迭代器吗?
当KCL应用程序重新启动时,它会自动从DynamoDB表恢复其状态,所以您不需要手动执行任何操作。处理从重新启动前的最后一个检查点开始继续,因此,如果重新启动意外发生,并且应用程序在退出前没有机会检查点,请准备好处理几个重复的项目(尽管可能还有其他重复的原因)
重新启动时,请确保提供与上次启动时相同的应用程序名称。否则,KCL会将这种情况视为创建一个新的独立应用程序,创建一个新的DynamoDB表并开始完全独立的处理。
我在玩春云流和RabbitMQ。 我有一个生成消息的RESTendpoint。 通过另一个应用程序,我正在消费这些消息。 当两个应用程序都启动并运行时。我可以发布消息并在消费者处使用它。 我在以下场景中面临的问题: 我故意让消费者失望,并通过制作人发布消息。现在,当消费者启动时,它没有收到任何消息 我想RabbitMQ保证消息传递 Github链接https://github.com/govi20
我正在开发一个Spring Boot应用程序,它使用以Kafka主题为源的Spring集成流。我们的集成流程开始使用一个包含带有spring framework . cloud . stream . annotation . input和Output注释的SubscribableChannels的接口。这些被配置为通过spring . Cloud . stream . Kafka . bindin
我想在一个spring boot应用程序中创建多个Kafka消费者组,以处理不同的Kafka队列。需求场景基于消息的关键性,它应该被推送到不同的Kafka队列。为了管理不同的Kafka队列,我想创建一个专用的Kafka消费群体。但我不确定我是否可以在一个spring boot应用程序中创建多个Kafka消费群体。 目前,我有三个Kafka主题,每个主题有4个部分,只有一个Kafka消费群体有三个K
我已经用Apache ActiveMQ和一个简单的应用程序创建了一个JMS代理,该应用程序将消息纳入队列OK。 我想创建另一个简单的应用程序,使用MDP异步出列这些消息。以下是我到目前为止所拥有的一个例子: 现在我大概需要一个main方法,但是如果消息到达队列时监听器会异步调用onMessage方法,我不确定如何编写代码: 谢谢你的帮助。
问题内容: 我尝试使用范围类型APPLICATION和带有@ Create,@ Beg的方法来注释类,但这似乎不起作用。 我想要的是在应用程序启动时立即启动无限循环。 问题答案: 如果希望在初始化后立即执行方法,则可以使用以下注释:
现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是 在不使用消费者的情况下启动API 关于如何手动管理消费过程的任何建议? 技术细节: 用于创建侦听器