当前位置: 首页 > 知识库问答 >
问题:

如何在同一时间大量排队和出队?

益英逸
2023-03-14

我有一个简单的场景,有两个线程,第一个线程永久读取一些数据,并将这些数据排入队列。第二个线程首先从队列中查看单个对象,并进行一些条件检查。如果这些都是好的,单个对象将被出列并传递给一些处理。

我尝试过使用< code>ConcurrentQueue,这是一个简单队列的线程安全实现,但是这个队列的问题是所有调用都被阻塞了。这意味着,如果第一个线程正在将一个对象入队,第二个线程就不能查看对象或将其出队。

在我的情况下,我需要在队列的末尾排队,同时从队列的开始排队。

C#的锁语句也会。

所以我的问题是,是否可以在不以线程安全的方式相互阻塞的情况下并行执行这两个操作。

这些是我的第一次尝试,这是一个类似的例子。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Scenario {
    public class Program {
        public static void Main(string[] args) {
            Scenario scenario = new Scenario();
            scenario.Start();
            Console.ReadKey();
        }

        public class Scenario {
            public Scenario() {
                someData = new Queue<int>();
            }

            public void Start() {
                Task.Factory.StartNew(firstThread);
                Task.Factory.StartNew(secondThread);
            }

            private void firstThread() {
                Random random = new Random();
                while (true) {
                    int newData = random.Next(1, 100);
                    someData.Enqueue(newData);
                    Console.WriteLine("Enqueued " + newData);
                }
            }

            private void secondThread() {
                Random random = new Random();
                while (true) {
                    if (someData.Count == 0) {
                        continue;
                    }

                    int singleData = someData.Peek();

                    int someValue = random.Next(1, 100);
                    if (singleData > someValue || singleData == 1 || singleData == 99) {
                        singleData = someData.Dequeue();
                        Console.WriteLine("Dequeued " + singleData);
                        // ... processing ...
                    }
                }
            }

            private readonly Queue<int> someData;
        }
    }
}

第二个例子:

public class Scenario {
    public Scenario() {
        someData = new ConcurrentQueue<int>();
    }

    public void Start() {
        Task.Factory.StartNew(firstThread);
        Task.Factory.StartNew(secondThread);
    }

    private void firstThread() {
        Random random = new Random();
        while (true) {
            int newData = random.Next(1, 100);
            someData.Enqueue(newData);
            lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
        }
    }

    private void secondThread() {
        Random random = new Random();
        while (true) {
            if (!someData.TryPeek(out int singleData)) {
                continue;
            }

            int someValue = random.Next(1, 100);
            if (singleData > someValue || singleData == 1 || singleData == 99) {
                if (!someData.TryDequeue(out singleData)) {
                    continue;
                }

                lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }

                // ... processing ...
            }
        }
    }

    private int enqued = 0;
    private int dequed = 0;

    private readonly ConcurrentQueue<int> someData;

    private static readonly object syncRoot = new object();
}

共有1个答案

荆树
2023-03-14

首先:我强烈建议您重新考虑使用多线程和共享内存数据结构的技术是否是正确的方法。具有多个控制线程共享访问数据结构的代码很难正确,而且失败可能是微妙的、灾难性的,并且很难调试。

第二:如果您热衷于多线程和共享内存数据结构,我强烈建议您使用专家设计的数据类型,如并发队列,而不是滚动自己的数据类型。

现在我已经解决了这些警告:这是解决您问题的一种方法。这非常复杂,您应该获得 C# 内存模型专家的服务,以验证解决方案的正确性(如果使用)。我不认为自己有能力实现我将要描述的方案,如果没有记忆模型专家的帮助。

目标是拥有一个支持同时排队和取消排队操作以及低锁争用的队列。

您需要的是两个不可变的堆栈变量,称为入队堆栈和出队堆栈,每个变量都有自己的锁。

入队操作是:

  • 使用排队锁
  • 将项目推到排队堆栈上;这在O(1)时间内产生新的堆栈
  • 将新生成的堆栈分配给排队堆栈变量
  • 释放排队锁

出队操作是:

  • 取出列锁
  • 如果出列堆栈为空,则
    • 使用排队锁
    • 枚举入队堆栈并使用它来构建出队堆栈;这反转了入队堆栈,它维护了我们想要的属性:先进先出。
      < li >将一个空的不可变堆栈赋给入队堆栈变量
      < li >释放入队锁
      < li >将新堆栈分配给出列堆栈

    请注意,当然,如果只有一个线程出列,那么我们根本不需要出列锁,但使用此方案,可以有许多线程出列。

    假设入队堆栈上有1000个项目,出队堆栈上没有。当我们第一次出队时,我们做了一个昂贵的O(n)操作,将入队堆栈反转一次,但现在出队堆栈上有1000个项目。一旦出队堆栈很大,出队线程可以花费大部分时间进行处理,而入队线程则花费大部分时间排队。排队锁上的争用很少,但发生时代价高昂。

    为什么要使用不可变的数据结构?我在这里描述的所有内容也适用于可变堆栈,但是 (1) 推理不可变堆栈更容易,(2) 如果你想真正危险地生活,你可以避开一些锁并进行互锁交换操作;如果您这样做,请确保您了解有关在低锁定条件下可能重新排序操作的所有信息。

    更新:

    真正的问题是我无法取消排队和处理很多点,因为我正在永久阅读和查询新点。排队调用会阻止处理步骤。

    如果这是你的真正问题,那么在问题中提到它而不是在评论中埋没它将是一个好主意。帮助我们帮助你。

    你可以在这里做很多事情。例如,您可以将排队线程的优先级设置为低于出列线程的优先级。或者,您可以有多个出队列程线程,与计算机中的 CPU 数量一样多。或者,如果取消排队跟上,您可以动态选择删除一些排队操作。如果不更多地了解您的实际问题,就很难就如何解决它提供建议。

 类似资料:
  • 我目前正在开发一个Twilio排队系统,但我被困在如何让电话排队并连接到代理上。 我把所有的电话都排在队列中,我的理解是,我们需要呼叫出队列,下面是twilio网站上可用的示例响应。 但是,当有人打电话给twilio时,这可以作为对twilio的回应发送。 那么,特工应该拨打twilio号码吗?如果是这样的话,我们是否需要为这个案例保留一个内部号码,除了我们为来电提供的支持号码? 谢谢你,巴斯卡。

  • 对于使用c实现的链表队列,我的入队列和出队列有点问题。我的老师说模板是禁止使用的,我不能改变他给我们的公共和私人功能。我总是遇到一个分割错误。我真的不明白我做错了什么。我还包括了header、enqueue和dequeue函数。

  • 我需要在Anylogic中模拟一个简单的M/M/1问题。到目前为止,我创建了模型并计算了所有性能度量,如队列和系统中的平均时间以及队列和系统中的平均数量。现在我需要计算总成本。汽车的喷漆时间为6小时,每小时70美元。每辆车的闲置时间成本为每小时100美元。车辆按照泊松过程到达,平均速度为每5小时1辆。有人能帮我用annylogic计算这个模型的总成本吗? 在此处输入图像描述

  • 我目前正在读一本关于数据结构/算法的教科书。其中一个练习是使用python列表结构实现一个高效的队列:入队和出队的时间复杂度平均需要为O(1)。书上说时间复杂度应该只有出队的一个具体情况是O(n),其余时间应该是O(1)。我实现了它,使得队列的后面是列表的末尾,而队列的前面是列表的开头;当我让一个元素出队时,我并没有从列表中删除它,而是简单地增加一个计数器,这样方法就知道列表中的哪个元素代表队列的

  • 问题内容: 我已经使用JavaFX制作了GUI,并且有三个单选按钮,一旦用户单击“提交”并创建了另一个线程,并且根据检查的单选按钮,该线程将运行所需的输出并将结果输出到控制台。 但是,当线程正在运行时(一个过程大约需要30秒才能完成),我可以检查任何单选按钮。为此,它创建另一个线程,并与其他正在进行的线程长时间输出。所以我的输出盒简直是一团糟!我正在查看 异步 任务,但不确定是否与此相关。 这是我

  • 我有一个web服务,它接收JSON格式的数据,处理数据,然后将结果返回给请求者。 我想使用度量请求、响应和总时间。 我的示例请求如下所示: 目前我在Linux中使用命令来度量这个值: 但是time命令只测量总时间--这不是我想要的。 是否有任何方法可以使用来度量请求和响应时间?