当前位置: 首页 > 知识库问答 >
问题:

如何从方法内外中断同步订阅?

昌学
2023-03-14

问题:我订阅了一个永无止境的消息服务,我的代码需要检查任何消息是否满足条件,如果满足,然后在处理所有消息并返回true之前关闭订阅。如果我已经处理了所有的消息,但条件不满足,那么我需要关闭订阅并返回false。

例如,条件是foo=5:

message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6

message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3 
msg4: foo=4 <= no more messages, return false and stop processing

我使用的订阅有一个同步方法,我必须传递异步EventHandler。下面是适用于这两种情况的功能代码,LastMessageReceivedDateTime跟踪最后一次接收消息的时间(以标识消息的结尾),_conditionstatistified告诉我是否获得了数据:

private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;

public Task<bool> CheckSubscription(IThirdParyCode connection)
{
     var subscription = connection.Subscribe(async (obj, args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
              _conditionSatisfied = true;
         }
     });

     while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now  && !_conditionSatisfied)
     {
         Thread.Sleep(500);
     }

     subscription?.Unsubscribe();
     return _activityCheckSatisfied;
}

更多信息:连接的类型是IstanConnection(来自NATS),Subscribe的签名是:

IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
    EventHandler<StanMsgHandlerArgs> handler);

我简化了签名,把重点放在我有问题的代码上。

共有1个答案

萧展鹏
2023-03-14

根据您的代码示例,我可以假设,如果在最后一条消息的一秒钟内没有新消息,则消息流结束。可以修改解决方案以消除活动等待循环,并将其替换为单个await调用。它将基于两项任务:

  1. 第一个任务将跟踪成功完成(在您的示例中_conditionsetfreed)并将由taskcompletionsource.setresult
  2. 设置
  3. 第二个任务将尝试通过使用CancellationToken任务包装器(这种包装器的示例实现)和CancellationTokenSource.cancelAfter的组合来发出流结束的信号,后者将尝试在每次迭代后取消任务并延迟。这将替换LastMessageReceivedDateTime.addSeconds(1)>DateTime.nowCondition.

修改后的代码应该如下所示:

private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();

public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
     // CancellationTokenTaskSource is in third-party library and not part of .NET
     var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);

     var subscription = connection.Subscribe(async (obj, args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
             satisfiedCompletionSource.SetResult(true);
         }
         streamEndCancellation.CancelAfter(1000);
     });

     Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task, streamEndSource.Task);
          
     subscription?.Unsubscribe();
     return !actualTask.IsCanceled;
}
 类似资料:
  • 问题内容: 问题 我正在通过ExecutorService运行某些外部方法的多次调用。我希望能够中断这些方法,但是不幸的是,它们不能自行检查中断标志。有什么办法可以强制从这些方法引发异常? 我知道从任意位置抛出异常是潜在的危险,在我的具体情况下,我愿意抓住这次机会并准备处理后果。 细节 “外部方法”是指某些来自外部库的方法,并且我无法修改其代码(可以,但是,每当发布新版本时,这都会成为维护的噩梦)

  • 我正试图从同步方法运行异步方法。但是我不能等待异步方法,因为我在同步方法中。我一定不理解TPL,因为这是我第一次使用它。 每个方法都需要前一个方法来完成,因为第一个方法的数据用于第二个方法。 Await运算符只能在异步方法中使用。考虑用'async'修饰符标记此方法,并将其返回类型更改为'task' 但是,如果我使用async修饰符,这将是一个异步操作。因此,如果我对的调用没有使用await运算符

  • 为什么下面的代码不能保证多个线程之间total_home数字的唯一性,即使逻辑处于同步块中。 } } } 这是一个程序示例。试着运行5-10次,你会发现total_home的值并不是每次都是唯一的。

  • 问题内容: 我正在查看包含同步方法的第三方库中的一些代码,在此方法中,有一个锁定在实例变量上的同步块。与此类似: 这有意义吗?如果是这样,在同步方法中使用同步语句有什么好处? 鉴于同步方法锁定了整个对象,对我来说似乎是多余的。在使用非私有的实例变量时,这种方法是否有意义? 问题答案: 在您的示例中,该方法 同时 锁定了和的实例。其他方法可能仅锁定对象的实例 或 对象。 因此,是的,这完全取决于他们

  • 问题内容: 由于异步方法问题,我收到一条错误消息。在我的终端中,我看到: 我注意到这是特别指出 这是我用于该部分的代码: 我试图弄清楚如何在此异步方法中使用a 。我该怎么办? 谢谢! 问题答案: 您可以在此处使用React模式来避免内存泄漏。 在您的构造函数中: } 在你的 在你里面 基于可取消承诺模式的推荐使用 Axios的 方法。因此,您可以在卸载组件的同时取消任何网络呼叫。这是Axios取消

  • 请看下面给我带来麻烦的方法: 然后是run方法: