问题:我订阅了一个永无止境的消息服务,我的代码需要检查任何消息是否满足条件,如果满足,然后在处理所有消息并返回true之前关闭订阅。如果我已经处理了所有的消息,但条件不满足,那么我需要关闭订阅并返回false。
例如,条件是foo=5
:
message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6
message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3
msg4: foo=4 <= no more messages, return false and stop processing
我使用的订阅有一个同步方法,我必须传递异步EventHandler
。下面是适用于这两种情况的功能代码,LastMessageReceivedDateTime
跟踪最后一次接收消息的时间(以标识消息的结尾),_conditionstatistified
告诉我是否获得了数据:
private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;
public Task<bool> CheckSubscription(IThirdParyCode connection)
{
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
_conditionSatisfied = true;
}
});
while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now && !_conditionSatisfied)
{
Thread.Sleep(500);
}
subscription?.Unsubscribe();
return _activityCheckSatisfied;
}
更多信息:连接
的类型是IstanConnection
(来自NATS),Subscribe
的签名是:
IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
EventHandler<StanMsgHandlerArgs> handler);
我简化了签名,把重点放在我有问题的代码上。
根据您的代码示例,我可以假设,如果在最后一条消息的一秒钟内没有新消息,则消息流结束。可以修改解决方案以消除活动等待循环,并将其替换为单个await
调用。它将基于两项任务:
_conditionsetfreed
)并将由taskcompletionsource.setresultLastMessageReceivedDateTime.addSeconds(1)>DateTime.now
Condition.修改后的代码应该如下所示:
private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();
public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
// CancellationTokenTaskSource is in third-party library and not part of .NET
var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
satisfiedCompletionSource.SetResult(true);
}
streamEndCancellation.CancelAfter(1000);
});
Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task, streamEndSource.Task);
subscription?.Unsubscribe();
return !actualTask.IsCanceled;
}
问题内容: 问题 我正在通过ExecutorService运行某些外部方法的多次调用。我希望能够中断这些方法,但是不幸的是,它们不能自行检查中断标志。有什么办法可以强制从这些方法引发异常? 我知道从任意位置抛出异常是潜在的危险,在我的具体情况下,我愿意抓住这次机会并准备处理后果。 细节 “外部方法”是指某些来自外部库的方法,并且我无法修改其代码(可以,但是,每当发布新版本时,这都会成为维护的噩梦)
我正试图从同步方法运行异步方法。但是我不能等待异步方法,因为我在同步方法中。我一定不理解TPL,因为这是我第一次使用它。 每个方法都需要前一个方法来完成,因为第一个方法的数据用于第二个方法。 Await运算符只能在异步方法中使用。考虑用'async'修饰符标记此方法,并将其返回类型更改为'task' 但是,如果我使用async修饰符,这将是一个异步操作。因此,如果我对的调用没有使用await运算符
为什么下面的代码不能保证多个线程之间total_home数字的唯一性,即使逻辑处于同步块中。 } } } 这是一个程序示例。试着运行5-10次,你会发现total_home的值并不是每次都是唯一的。
问题内容: 我正在查看包含同步方法的第三方库中的一些代码,在此方法中,有一个锁定在实例变量上的同步块。与此类似: 这有意义吗?如果是这样,在同步方法中使用同步语句有什么好处? 鉴于同步方法锁定了整个对象,对我来说似乎是多余的。在使用非私有的实例变量时,这种方法是否有意义? 问题答案: 在您的示例中,该方法 同时 锁定了和的实例。其他方法可能仅锁定对象的实例 或 对象。 因此,是的,这完全取决于他们
问题内容: 由于异步方法问题,我收到一条错误消息。在我的终端中,我看到: 我注意到这是特别指出 这是我用于该部分的代码: 我试图弄清楚如何在此异步方法中使用a 。我该怎么办? 谢谢! 问题答案: 您可以在此处使用React模式来避免内存泄漏。 在您的构造函数中: } 在你的 在你里面 基于可取消承诺模式的推荐使用 Axios的 方法。因此,您可以在卸载组件的同时取消任何网络呼叫。这是Axios取消
请看下面给我带来麻烦的方法: 然后是run方法: