当前位置: 首页 > 知识库问答 >
问题:

在满足条件之前,如何处理一条消息而不让消息离开队列?

沈永新
2023-03-14

这是关于一个特定的用例,我计划通过flink流来解决这个用例。

一个消息被发送到flink流处理,流被键控,从而得到预期的分区。然而,每个密钥的每个消息都需要评估,直到满足一个条件为止,例如,假设有一个银行系统,其中一个帐户的帐户交易(消息)需要按顺序处理,并且不可能处理不按顺序处理的消息,因为它将导致不一致的系统状态。系统需要等待一条消息被处理(甚至可能超过2-3天),然后再按顺序处理下一条消息。如何在flink中实现这一点,而不阻塞与其他密钥相关联的消息处理的任何部分?

共有1个答案

齐冥夜
2023-03-14

你看过CEP图书馆了吗?您可以指定如下模式:

Pattern<Event, ?> pattern = Pattern.<Event>begin("firstOfSequence").where(new FilterFunction<Event>() {
    private static final long serialVersionUID = 5726188262756267490L;

    @Override
    public boolean filter(Event value) throws Exception {
        return value.isFirstOfSequence();
    }
}).followedBy("secondOfSequence").where(new FilterFunction<Event>() {
    private static final long serialVersionUID = 5726188262756267490L;

    @Override
    public boolean filter(Event value) throws Exception {
        return value.isSecondOfSequence();
    }
});
 类似资料:
  • 问题内容: 通过Rabbitmq中的示例,消费者可以一次从队列中获取所有消息。如何使用一条消息并退出? 问题答案: 您必须声明basicQos设置,才能一次从ACK到NACK状态获取一条消息,并禁用自动ACK以便显式给出确认。 希望能帮助到你!

  • 当a不等于b时,我想通过bot发送自动discord消息。我有其他命令在工作,但它们都需要用户通过discord输入

  • 编辑:在我写的时候解决了这个问题:P--我喜欢这样的解决方案。我想无论如何我都要把它贴出来,也许别人也会有同样的问题,找到我的解决办法。不关心点数/因果报应等等。我只是把整个事情写了出来,所以我想我应该把它和解决方案贴出来。 我有一个SQS FIFO队列。它使用的是一纸空文队列。以下是它的配置方式: 我有一个单一的生产者微服务,我有10个ECS映像运行作为消费者。 由于业务原因,我们在接近消息在队

  • 我有和这里描述的相同的问题:ActiveMQ:一条挂起的消息,但队列是空的。 更多客户端: 与hawtio-相同的行为 我的java消费者 不消耗任何东西 重启后,继续消耗 我们不将消息保存在默认的kaha db中,而是保存在Oracle DB中。我可以在表ACTIVEMQ_MSGS中看到数据库中的待处理消息。 重新启动activeMQ后,所有客户端都按预期工作。在管理控制台中,我可以看到消息,j

  • 我有两个消费者(不同的应用程序)连接到Azure队列。我可以或消息,在消费过程中,我可以或消息。裁判:http://msdn.microsoft.com/en-us/library/azure/hh851750.aspx. 我确定我想使用,然后消息,因为我希望它们在两个应用程序中被接收。我想我应该将队列上的消息生存期设置为10秒,作为删除机制。 然而,由于消息似乎在10秒钟后被删除,因此在这10秒

  • 我希望对RabbitMQ中的队列有这样的约束: 编辑(澄清):将有许多消费者都试图从所有队列中获取工作,由于他们无法从一个队列中获取工作,该队列中处理的事件未被加密,所以有序处理将被维护。