当前位置: 首页 > 知识库问答 >
问题:

MassTransit消费者传奇能由它所策划的相同信息发起吗?

宋鸿
2023-03-14

7.0中对Event Hub Riders的新支持,加上现有的InmemoryRepository对Sagas的支持,似乎可以提供一种基于相关消息流(例如,在建筑物中的所有传感器之间)创建聚合状态的简单方法。在这个场景中,建筑物的标识符将用作消息的CorrelationId、Saga和发送到事件集线器的EventData消息的PartitionKey,以确保同一消费服务实例在给定时间接收该设备的所有消息。根据Event Hub重新平衡的工作方式,可以假设在该服务运行的某个时刻,管理分区消息的服务实例将转移到一个新主机,该主机将开始读取建筑物中传感器发送的消息。那一刻:

    null

这带来的挑战是:在新的服务实例上,我们需要创建一个新的Saga来接管以前的Saga,但唯一知道给定实体不存在Saga的是masstransit:新实例上的任何东西都不知道从构建a读取的传感器是自该服务html" target="_blank">实例接管跟踪聚合构建a状态以来从构建a读取的第一个传感器。我们认为这可以通过在同一消息(datacollected)中同时标记initiatedbycourrates)来处理:

public class BuildingAggregator:
            ISaga,
            InitiatedBy<DataCollected>, //init saga on first DataCollected with a given CorrelationId seen
            Orchestrates<DataCollected> //then keep handling those in that saga
{
     //saga Consume methods
}

但是,当BuildingAggregator收到具有给定GUID的第二条DataCollected消息时,会引发以下异常:

Saga exception on receipt of MassTransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
   at MassTransit.Saga.Policies.NewSagaPolicy`2.MassTransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context, IPipe`1 next)
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

有没有另一种实现这一逻辑的方法?这是应用传奇的“错误方式”吗?

共有1个答案

谢锦程
2023-03-14

根据Chris Patterson对上述问题的评论,这是可以通过状态机语法实现的:

Initially(
    When(DataCollected)
        .Then(f => _logger.LogInformation("Initiating Network Manager for Network: {NetworkId}", f.Data.NetworkId))
        .TransitionTo(Running));
During(Running,
    When(DataCollected)
        .Then(f => { // activities and state transitions }),
    When(SimulationComplete)
        .Then(f => _logger.LogInformation("Network {NetworkId} shutting down.", f.Instance.CorrelationId))
        .TransitionTo(Final));

请注意在初始状态转换和由初始条件设置的状态转换中如何处理DataCollected事件。

 类似资料:
  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “

  • 主要内容:1 PullMessageService拉取消息,2 PullMessageService#pullMessage拉取消息,3 DefaultMQPushConsumerImpl#pullMessage拉取消息,3.1 pullKernelImpl拉取消息,3.2 pullMessage发起拉取消息请求,4 总结基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者发起拉取消息请求源码。 此前我们学习了DefaultMQPushC

  • 我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条