当前位置: 首页 > 知识库问答 >
问题:

限制生产者/消费者模式导致死锁

姚晋
2023-03-14

本周早些时候,我在这里获得了一些关于Stackoverflow的帮助,这导致了一个生产者/消费者模式的发展,用于加载处理并将大型数据集导入RavenDb。CPU受限任务的并行化与IO受限任务的并行化

我现在希望限制生产商提前准备的工作单元的数量,以管理内存消耗。我已经使用一个基本信号量实现了节流,但在某个点上实现死锁时遇到了问题。

我无法找出导致死锁的原因。以下是代码摘录:

private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure)
    where TParsedData : class
    where TData : class
{
    Console.WriteLine(@"Loading {0}...", typeof(TData).ToString());

    var batchCounter = 0;

    var ist = Stopwatch.StartNew();

    var throttler = new SemaphoreSlim(10);
    var bc = new BlockingCollection<IndexedBatch<TData>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(data =>
            {
                var st = Stopwatch.StartNew();
                importProceedure(data);

                Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds);
                throttler.Release();
            });
    });
    var processTask = Task.Run(() =>
    {
        dataLoader.GetParsedItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                throttler.Wait(); //.WaitAsync()
                var batchno = ++batchCounter;
                var st = Stopwatch.StartNew();

                bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch)));

                Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds);
            });
    });

    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();

    Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds);
}

public class IndexedBatch<TBatch> 
    where TBatch : class
{
    public IndexedBatch(int index, List<TBatch> batch)
    {
        Index = index;
        Batch = batch ?? new List<TBatch>();
    }

    public int Index { get; set; }
    public List<TBatch> Batch { get; set; }
}

这是对LoadData的调用:

LoadData<DataBase, Data>(
    DataLoaderFactory.Create<DataBase>(datafilePath),
    1024,
    (data) =>
    {
        using (var session = Store.OpenSession())
        {
            foreach (var i in data.Batch)
            {
                session.Store(i);
                d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1)));
            }
            session.SaveChanges();
        }
    },
    (batch) =>
    {
        return batch.Select(i => new Data()
        {
            ...
        }).ToList();
    }
);

商店是RavenDb IDocumentStore。DataLoaderFactory为给定数据集构造一个自定义解析器。

共有2个答案

崔宇
2023-03-14

你能检查一下你有多少线程吗?可能由于阻塞而耗尽了线程池。如果TPL认为没有线程,您的代码就会死锁,那么它注入的线程比ProcessorCount多。但它只能做到一定限度。

无论如何,在TPL任务中阻塞通常是一个坏主意,因为内置启发式算法最适合非阻塞的东西。

蒋权
2023-03-14

如果没有表示“在这里阻塞!”的大箭头,很难调试死锁。避免在没有调试器的情况下调试代码:BlockingCollection已经可以限制。使用采用int bounded容量参数的构造函数并消除信号量。解决死锁的可能性非常高。

 类似资料:
  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统