当前位置: 首页 > 知识库问答 >
问题:

为什么平行。Foreach创建无尽的线程?

严易安
2023-03-14

下面的代码继续创建线程,即使队列是空的...直到最终发生OutOfMemory异常。如果我用一个常规的foreach替换并行。Foreach,这不会发生。有人知道为什么会发生这种情况吗?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}

共有3个答案

程鸿波
2023-03-14

在任务并行库内部,并行。For和Parallel。Foreach遵循爬山算法来确定操作应使用多少并行性。

或多或少,他们从一个任务开始运行身体,移动到两个,等等,直到达到一个断点,他们需要减少任务数量。

这对于快速完成的方法体非常有效,但如果方法体需要很长时间才能运行,则可能需要很长时间才能意识到需要减少并行量。在此之前,它会继续添加任务,并可能导致计算机崩溃。

我在任务并行库的一位开发人员的演讲中学习了上述内容。

指定MaxDegreeOfParallelism可能是最简单的方法。

聂和宜
2023-03-14

生产者/消费者模式主要用于只有一个生产者和一个消费者的情况。

但是,您试图实现的(多个消费者)更适合Worklist模式。以下代码摘自unit2幻灯片“2c-共享内存模式”的幻灯片,来自犹他大学教授的并行编程课程,可在http://ppcp.codeplex.com/下载

BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount

public void Run()
{
  int num_workers = 4;

  //create worklist, filled with initial work
  worklist = new BlockingCollection<Item>(
    new ConcurrentQueue<Item>(GetInitialWork()));

  cts = new CancellationTokenSource();
  itemcount = worklist.Count();

  for( int i = 0; i < num_workers; i++)
    Task.Factory.StartNew( RunWorker );
}

IEnumberable<Item> GetInitialWork() { ... }

public void RunWorker() {
  try  {
    do {
      Item i = worklist.Take( cts.Token );
      //blocks until item available or cancelled
          Process(i);
      //exit loop if no more items left
    } while (Interlocked.Decrement( ref itemcount) > 0);
  } finally {
      if( ! cts.IsCancellationRequested )
        cts.Cancel();
    }
  }
}

public void AddWork( Item item) {
  Interlocked.Increment( ref itemcount );
  worklist.Add(item);
}

public void Process( Item i ) 
{
  //Do what you want to the work item here.
}

前面的代码允许您将工作列表项添加到队列中,并允许您设置任意数量的worker(在本例中为四个)以将项目从队列中拉出并处理它们。

关于.Net 4.0上的并行性的另一个重要资源是《使用Microsoft.Net进行并行编程》一书,该书可在以下网站免费获得:http://msdn.microsoft.com/en-us/library/ff963553

酆光熙
2023-03-14

我认为并行。Foreach()主要是为了处理有界集合而制作的。它不期望像Get消耗分区器()返回的集合那样,其中MoveNext()长时间阻塞。

问题是并行。ForEach()试图找到最佳并行度,因此它启动的任务数量与任务调度程序运行的数量相同。但是任务调度器(TaskScheduler)发现有许多任务需要很长时间才能完成,而且它们什么都没有做(它们会阻塞),因此它会继续启动新的任务。

我认为最好的解决方案是设置MaxDegreeOfParallelism

作为替代方案,您可以使用TPL数据流的ActionBlock。这种情况下的主要区别是,当没有要处理的项目时,ActionBlock不会阻止任何线程,因此线程的数量不会接近极限。

 类似资料:
  • 我试图通过拆分列表来检查元素是否在字符串中: `公共类ParallelSearchComment扩展了RecursiveTask{private static final long serialVersionUID=1L; `

  • 我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。

  • 问题内容: 创建线程很昂贵。但是为什么价格昂贵呢?当创建Java线程使创建过程变得昂贵时,究竟发生了什么?我认为该说法是正确的,但是我只是对JVM中的线程创建机制感兴趣。 线程生命周期开销。线程创建和拆除不是免费的。实际开销因平台而异,但是线程创建会花费时间,从而在请求处理中引入延迟,并且需要JVM和OS进行某些处理活动。如果请求是频繁且轻量的(如在大多数服务器应用程序中一样),则为每个请求创建一

  • 问题内容: SQLAlchemy正在生成但未启用Postgresql中列的序列。我怀疑我在引擎设置中可能做错了什么。 使用SQLAlchemy教程(http://docs.sqlalchemy.org/en/rel_0_9/orm/tutorial.html)中的示例: 使用此脚本,将生成下表: 然而,序列 被 创建: SQLAlchemy 0.9.1,Python 2.7.5 +,Postgre

  • 问题内容: 我了解JSON,但不了解JSONP。Wikipedia上有关JSON的文档是JSONP的最高搜索结果。它说: JSONP或“带填充的JSON”是JSON扩展,其中将前缀指定为调用本身的输入参数。 ??什么电话 这对我来说毫无意义。JSON是一种数据格式。没有电话 在第二个搜索结果是由某些人叫雷米,谁写的这个约JSONP: JSONP是脚本标记注入,它将响应从服务器传递到用户指定的函数。

  • 我正在构建一个软件平台,作为概念验证(并确定它需要什么),我正在使用此处列出的我的平台存储库构建一个名为Telegram RP的即时通讯程序:https://github.com/BlueHuskyStudios/Blue-Husky-Software-Platform 现在,我已经决定将特定于JVM的代码与通用代码分开,但在我这样做的第一步,它将无法编译。 我被难住了。我已经三次检查了语言级别的