当前位置: 首页 > 知识库问答 >
问题:

状态机和网络套接字-如何处理竞争条件

闾丘正志
2023-03-14

我在我的C#网络项目中使用无状态,主要是因为它是一种很好的方法来添加功能,如套接字连接后的有线级授权、重新连接的延迟等。

话虽如此,但我自己却陷入了一些竞争状态和僵局--我向以下各州寻求解决问题的最佳方法:

enum State { Stopped, Disconnected, Connecting, Connected, Resetting }
enum Trigger { Start, Stop, Connect, SetConnectComplete, Reset, SetResetComplete }

class StateMachine : StateMachine<State, Trigger>
{
    public StateMachine(Action OnDisconnected, Action OnConnecting, Action OnConnected, Action OnResetting) : base(State.Stopped)
    {
        this.Configure(State.Stopped)
            .Permit(Trigger.Start, State.Disconnected);

        this.Configure(State.Disconnected)
            .OnEntry(OnDisconnected)
            .Permit(Trigger.Connect, State.Connecting);

        this.Configure(State.Connecting)
            .OnEntry(OnConnecting)
            .Permit(Trigger.SetConnectComplete, State.Connected)
            .Permit(Trigger.Reset, State.Resetting);

        this.Configure(State.Connected)
            .OnEntry(OnConnected)
            .Permit(Trigger.Reset, State.Resetting);

        this.Configure(State.Resetting)
            .OnEntry(OnResetting)
            .Permit(Trigger.SetResetComplete, State.Disconnected);
    }
}

它的功能是套接字将自动重新连接,并在连接时启动接收循环。如果出现套接字错误,它应该返回以释放资源,然后循环返回以重新启动。

然而,当我处理对象时,连接的套接字中止,这也释放了资源,并且它尝试自己等待。

我相信这是因为线程在等待自己,所以我的设计/状态结构从根本上肯定是不正确的,我很欣赏更好的结构的指针,它可以完全避免死锁。

public class ManagedWebSocket : IDisposable
{
    readonly StateMachine stateMachine;
    Task backgroundReaderTask;

    private ClientWebSocket webSocket;
    private readonly ITargetBlock<byte[]> target;
    private readonly ILogger<ManagedWebSocket> logger;
    private CancellationTokenSource cancellationTokenSource;
    bool isDisposing;

    public ManagedWebSocket(string uri, ITargetBlock<byte[]> target, ILogger<ManagedWebSocket> logger)
    {
        this.stateMachine = new StateMachine(OnDisconnected, OnConnecting, OnConnected, OnResetting);
        this.target = target;
        this.logger = logger;
    }

    private void OnConnecting()
    {
        this.backgroundReaderTask = Task.Run(async () =>
        {
            this.cancellationTokenSource = new CancellationTokenSource();
            this.webSocket = new ClientWebSocket();
            webSocket.Options.KeepAliveInterval = KeepAliveInterval;

            try
            {
                await this.webSocket.ConnectAsync(this.uri, cancellationTokenSource.Token);
            }
            catch(WebSocketException ex)
            {
                this.logger.LogError(ex.Message, ex);
                await this.stateMachine.FireAsync(Trigger.Reset);
            }

            this.stateMachine.Fire(Trigger.SetConnectComplete);
        });
    }
    
    private void OnDisconnected()
    {
        if (isDisposing == false)
            this.stateMachine.Fire(Trigger.Connect);
    }

    private void OnResetting()
    {
        FreeResources();
        this.stateMachine.Fire(Trigger.SetResetComplete);
    }

    private void OnConnected()
    {
        this.backgroundReaderTask = Task.Run( async () => {
            try
            {
                // returns when the internal frame loop completes with websocket close, or by throwing an exception
                await this.webSocket.ReceiveFramesLoopAsync(target.SendAsync, 2048, this.cancellationTokenSource.Token);
            }
            catch (Exception ex)
            {
                this.logger.LogError(ex.Message, ex);
            }

            await this.stateMachine.FireAsync(Trigger.Reset);
        });
    }

    public async Task SendAsync(byte[] data, WebSocketMessageType webSocketMessageType)
    {
        if (this.stateMachine.State != State.Connected)
            throw new Exception($"{nameof(ManagedWebSocket)} is not yet connected.");

        try
        {
            await webSocket
                    .SendAsChunksAsync(data, webSocketMessageType, 2048, this.cancellationTokenSource.Token)
                    .ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            this.logger.LogError(ex, ex.Message);
            await this.stateMachine.FireAsync(Trigger.Reset);
        }
    }

    public void Start()
    {
        this.stateMachine.Fire(Trigger.Start);
    }    

    public void FreeResources()
    {
        this.logger.LogDebug($"{nameof(ManagedWebSocket.FreeResources)}");
        this.cancellationTokenSource?.Cancel();
        this.backgroundReaderTask?.Wait();
        this.cancellationTokenSource?.Dispose();
        this.backgroundReaderTask?.Dispose();
    }

    public void Dispose()
    {
        if (isDisposing)
            return;

        isDisposing = true;
        FreeResources();
    }
}

共有1个答案

屠晟睿
2023-03-14

我猜想死锁是由于在onResetting()中调用freeResources();引起的,因为freeResources();正在等待BackgroundReaderTask但在BackgroundReaderTask中,您正在通过Await this.StateMachine.Fireasync(Trigger.Reset);等待onResetting()

作为某种变通方法,您可以省略触发重置的“await”关键字,因为它将处理整个对象。

还请注意,如果以前在onconnecting()中抛出了异常,那么似乎没有理由调用this.stateMachine.fire(Trigger.SetConnectComplete);-只需将其移动到try-block中即可。

此外,作为某种最佳实践和附带说明,请尝试遵循推荐的dispose模式

 类似资料:
  • 问题内容: 我有几个工作人员,每个工作人员都拥有与PostgreSQL的连接。工人用不同的桌子操纵。 工作人员处理来自系统外部的并行请求。被访问的表之一是用户表。当收到一些信息时,我首先需要确保表中有该用户的记录。如果没有记录,我希望首先创建一个。 我正在使用以下成语: 的代码是: 然后测试是否返回任何行。 的(简化)代码为: 当我的系统处理与 同一 用户有关 的 不同信息的并行流时,我经常会收到

  • 主要内容:锁住共享资源有并发,就有资源竞争,如果两个或者多个 goroutine 在没有相互同步的情况下,访问某个共享的资源,比如同时对该资源进行读写时,就会处于相互竞争的状态,这就是并发中的资源竞争。 并发本身并不复杂,但是因为有了资源竞争的问题,就使得我们开发出好的并发程序变得复杂起来,因为会引起很多莫名其妙的问题。 下面的代码中就会出现竞争状态: 这是一个资源竞争的例子,大家可以将程序多运行几次,会发现结果可能是

  • 9.1. 竞争条件 在一个线性(就是说只有一个goroutine的)的程序中,程序的执行顺序只由程序的逻辑来决定。例如,我们有一段语句序列,第一个在第二个之前(废话),以此类推。在有两个或更多goroutine的程序中,每一个goroutine内的语句也是按照既定的顺序去执行的,但是一般情况下我们没法去知道分别位于两个goroutine的事件x和y的执行顺序,x是在y之前还是之后还是同时发生是没法

  • 终止TCP/IP连接的最佳方法是什么,该连接清理了系统上的开放套接字,但让远程方挂起。那就是我想要关闭一个插座的方式,对我来说是最低的成本,但对他们来说是最高的成本。 @尼古拉斯·威尔逊: 使用tcprepair似乎是个好主意。当套接字在TCP_REPAIR模式下关闭时,不会发送FIN或RST数据包。远程插座挂起。我要去试试看然后回去报告。下面是我的(未经测试的)代码: 只是不要关闭它。打开的插座

  • 9.6. 竞争条件检测 即使我们小心到不能再小心,但在并发程序中犯错还是太容易了。幸运的是,Go的runtime和工具链为我们装备了一个复杂但好用的动态分析工具,竞争检查器(the race detector)。 只要在go build,go run或者go test命令后面加上-race的flag,就会使编译器创建一个你的应用的“修改”版或者一个附带了能够记录所有运行期对共享变量访问工具的tes